-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming without ENABLE_SCHEMA_EVOLUTION; postgres JSON mapped to snowflake VARIANT #536
base: master
Are you sure you want to change the base?
Changes from 13 commits
63bc190
c91c9b4
e42e169
4a68939
1fceaef
4c03bd0
e6e702e
7efd254
8369100
003e67e
f12e887
f463138
c3e093f
88ce343
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,25 @@ | |
import com.snowflake.kafka.connector.internal.SnowflakeErrors; | ||
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; | ||
import com.snowflake.kafka.connector.records.RecordService; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.LinkedHashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
import java.util.stream.Stream; | ||
|
||
import javax.annotation.Nonnull; | ||
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; | ||
|
||
import org.apache.kafka.connect.data.Date; | ||
import org.apache.kafka.connect.data.Decimal; | ||
import org.apache.kafka.connect.data.Field; | ||
import org.apache.kafka.connect.data.Schema; | ||
import org.apache.kafka.connect.data.Time; | ||
import org.apache.kafka.connect.data.Timestamp; | ||
import org.apache.kafka.connect.data.Schema.Type; | ||
import org.apache.kafka.connect.sink.SinkRecord; | ||
import org.slf4j.Logger; | ||
|
@@ -83,7 +95,15 @@ public static void evolveSchemaIfNeeded( | |
|
||
// Add columns if needed, ignore any exceptions since other task might be succeeded | ||
if (extraColNames != null) { | ||
Map<String, String> extraColumnsToType = getColumnTypes(record, extraColNames); | ||
List<String> fieldNamesOrderedAsOnSource = Stream.concat( | ||
record.keySchema().fields().stream().map(f -> f.name()), | ||
record.valueSchema().fields().stream().map(f -> f.name()) | ||
).collect(Collectors.toList()); | ||
List<String> extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames); | ||
extraColNamesOrderedAsOnSource.sort( | ||
Comparator.comparingInt(fieldNamesOrderedAsOnSource::indexOf)); | ||
Map<String, String> extraColumnsToType = getColumnTypes(record, extraColNamesOrderedAsOnSource); | ||
|
||
try { | ||
conn.appendColumnsToTable(tableName, extraColumnsToType); | ||
} catch (SnowflakeKafkaConnectorException e) { | ||
|
@@ -109,7 +129,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column | |
if (columnNames == null) { | ||
return new HashMap<>(); | ||
} | ||
Map<String, String> columnToType = new HashMap<>(); | ||
Map<String, String> columnToType = new LinkedHashMap<>(); | ||
Map<String, String> schemaMap = getSchemaMapFromRecord(record); | ||
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value()); | ||
|
||
|
@@ -145,7 +165,7 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) { | |
Schema schema = record.valueSchema(); | ||
if (schema != null) { | ||
for (Field field : schema.fields()) { | ||
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type())); | ||
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name())); | ||
} | ||
} | ||
return schemaMap; | ||
|
@@ -158,7 +178,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) { | |
// only when the type of the value is unrecognizable for JAVA | ||
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass()); | ||
} | ||
return convertToSnowflakeType(schemaType); | ||
return convertToSnowflakeType(schemaType, null); | ||
} | ||
|
||
/** Convert a json node type to kafka data type */ | ||
|
@@ -192,7 +212,25 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { | |
} | ||
|
||
/** Convert the kafka data type to Snowflake data type */ | ||
private static String convertToSnowflakeType(Type kafkaType) { | ||
private static String convertToSnowflakeType(Type kafkaType, String semanticType) { | ||
if (semanticType != null) { | ||
switch (semanticType) { | ||
case Decimal.LOGICAL_NAME: | ||
return "DOUBLE"; | ||
case Time.LOGICAL_NAME: | ||
case Timestamp.LOGICAL_NAME: | ||
case "io.debezium.time.ZonedTimestamp": | ||
case "io.debezium.time.ZonedTime": | ||
case "io.debezium.time.MicroTime": | ||
case "io.debezium.time.Timestamp": | ||
case "io.debezium.time.MicroTimestamp": | ||
return "TIMESTAMP"; | ||
case Date.LOGICAL_NAME: | ||
case "io.debezium.time.Date": | ||
return "DATE"; | ||
} | ||
} | ||
|
||
switch (kafkaType) { | ||
case INT8: | ||
return "BYTEINT"; | ||
|
@@ -209,6 +247,9 @@ private static String convertToSnowflakeType(Type kafkaType) { | |
case BOOLEAN: | ||
return "BOOLEAN"; | ||
case STRING: | ||
if (semanticType != null && semanticType.equals("io.debezium.data.Json")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the issue of using VARCHAR in this case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By using varchar you'd have to parse json each time you wanted to pull out a field. With variant can use json.field. So this will skip extra processing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, this makes sense, I guess this is a general issue for json value since they will be all mapped to STRING, I will see what we can do here, thanks! |
||
return "VARIANT"; | ||
} | ||
return "VARCHAR"; | ||
case BYTES: | ||
return "BINARY"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work because we rely on the schema evolution to create the table with the correct schema, if you don't want schema evolution on the table, you should create the table yourself and it will have schema evolution turned off by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the reply, the schema is evolved here: https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L653. The main point of this draft PR is to assess if "manual schema evolution" from
SchematizationUtils.evolveSchemaIfNeeded
can be supported as an alternative to "automatic schema evolution". Our main concern is that that "manual schema evolution" is currently done on the error path, if theinsertRow
fails.We cand contribute this "manual schema evolution" option, when setting
snowflake.schematization.auto=false
the connector would use the connect schema from the schema registry or embedded into the records to evolve the snowflake schema before inserting the data. But we wanted to see first if this approach is acceptable for you going forward.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is by design, insertRow will always fail first because we just create a table with RECORD_METADATA only during connector start up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, that should not be a problem, I mean schema changes should not happen often. The problem is that without these proposed changes https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L264, we cannot enable schema evolution without setting
ENABLE_SCHEMA_EVOLUTION
. We would like to use onlySchematizationUtils.evolveSchemaIfNeeded
and notENABLE_SCHEMA_EVOLUTION
.