Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce error branch #993

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@
<artifactId>snowflake-jdbc</artifactId>
</exclusion>
</exclusions>
<scope>system</scope>
<systemPath>/Users/bzabek/snowflake-kafka-connector/snowflake-ingest-sdk.jar</systemPath>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClientFactory;
import net.snowflake.ingest.utils.SFException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.kafka.connect.errors.ConnectException;

/** This class handles all calls to manage the streaming ingestion client */
Expand All @@ -49,8 +48,6 @@ public SnowflakeStreamingIngestClient createClient(
.setProperties(streamingClientProperties.clientProperties)
.setParameterOverrides(streamingClientProperties.parameterOverrides);

setIcebergEnabled(builder, streamingClientProperties.isIcebergEnabled);

SnowflakeStreamingIngestClient createdClient = builder.build();

LOGGER.info(
Expand All @@ -65,17 +62,6 @@ public SnowflakeStreamingIngestClient createClient(
}
}

private static void setIcebergEnabled(
SnowflakeStreamingIngestClientFactory.Builder builder, boolean isIcebergEnabled) {
try {
// TODO reflection should be replaced by proper builder.setIceberg(true) call in SNOW-1728002
FieldUtils.writeField(builder, "isIceberg", isIcebergEnabled, true);
} catch (IllegalAccessException e) {
throw new IllegalStateException(
"Couldn't set iceberg by accessing private field: " + "isIceberg", e);
}
}

/**
* Closes the given client. Swallows any exceptions
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public StreamingClientProperties(Map<String, String> connectorConfig) {

// Override only if the streaming client properties are explicitly set in config
this.parameterOverrides = new HashMap<>();
if (isIcebergEnabled) {
// todo extract to field
this.parameterOverrides.put("enable_iceberg_streaming", "true");
}
Optional<String> snowpipeStreamingMaxClientLag =
Optional.ofNullable(connectorConfig.get(SNOWPIPE_STREAMING_MAX_CLIENT_LAG));
snowpipeStreamingMaxClientLag.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ public String mapToColumnType(Schema.Type kafkaType, String schemaName) {
} else {
return "BINARY";
}
case STRUCT:
return "OBJECT()";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont enter this code path.

case ARRAY:
throw new IllegalArgumentException("Arrays, struct and map not supported!");
default:
// MAP and STRUCT will go here
throw new IllegalArgumentException("Arrays, struct and map not supported!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord;
import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata;
import com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand All @@ -35,7 +33,7 @@ protected void createIcebergTable() {

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
@Disabled
// @Disabled
void shouldEvolveSchemaAndInsertRecords(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
Expand Down Expand Up @@ -81,6 +79,64 @@ void shouldEvolveSchemaAndInsertRecords(
assertRecordsInTable();
}

@ParameterizedTest(name = "{0}")
@MethodSource("prepareData")
// @Disabled
void shouldEvolveSchemaAndInsertRecords_withObjects(
String description, String message, DescribeTableRow[] expectedSchema, boolean withSchema)
throws Exception {
// start off with just one column
List<DescribeTableRow> rows = describeTable(tableName);
assertThat(rows)
.hasSize(1)
.extracting(DescribeTableRow::getColumn)
.contains(Utils.TABLE_COLUMN_METADATA);

SinkRecord record = createKafkaRecord(message, 0, withSchema);
service.insert(Collections.singletonList(record));
waitForOffset(-1);
rows = describeTable(tableName);
assertThat(rows.size()).isEqualTo(9);

// don't check metadata column schema, we have different tests for that
rows =
rows.stream()
.filter(r -> !r.getColumn().equals(Utils.TABLE_COLUMN_METADATA))
.collect(Collectors.toList());

assertThat(rows).containsExactlyInAnyOrder(expectedSchema);

// resend and store same record without any issues now
service.insert(Collections.singletonList(record));
waitForOffset(1);

// and another record with same schema
service.insert(Collections.singletonList(createKafkaRecord(message, 1, withSchema)));
waitForOffset(2);

String testStruct = "{ \"testStruct\": {" + "\"k1\" : 1," + "\"k2\" : 2" + "} " + "}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"nested_key1\" : 1}," +
// "\"k2\" : { \"nested_key2\" : 2}" +
// "} " +
// "}";

// String testStruct =
// "{ \"testStruct\": {" +
// "\"k1\" : { \"car\" : { \"brand\" : \"vw\" } }," +
// "\"k2\" : { \"car\" : { \"brand\" : \"toyota\" } }" +
// "} " +
// "}";
// reinsert record with extra field
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
rows = describeTable(tableName);
// assertThat(rows).hasSize(15);
service.insert(Collections.singletonList(createKafkaRecord(testStruct, 2, false)));
waitForOffset(3);
}

private void assertRecordsInTable() {
List<RecordWithMetadata<PrimitiveJsonRecord>> recordsWithMetadata =
selectAllSchematizedRecords();
Expand Down
Loading