From 63bc190dd75692e6423d3f50b25143dafaa40d1a Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Mon, 6 Feb 2023 16:39:49 +0000 Subject: [PATCH 01/10] Added snowflake.schematization.auto; added postgresql JSON to VARIANT type convertion --- .../SnowflakeSinkConnectorConfig.java | 7 +++++ .../internal/SnowflakeConnectionService.java | 2 +- .../SnowflakeConnectionServiceV1.java | 26 ++++++++++--------- .../streaming/SchematizationUtils.java | 9 ++++--- .../streaming/SnowflakeSinkServiceV2.java | 5 +++- .../streaming/TopicPartitionChannel.java | 8 ++++-- .../connector/records/RecordService.java | 19 ++++++++++++++ 7 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 4498c7d8a..5918b2bb8 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -74,6 +74,13 @@ public class SnowflakeSinkConnectorConfig { public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization"; public static final String ENABLE_SCHEMATIZATION_DEFAULT = "false"; + public static final String SCHEMATIZATION_AUTO_CONFIG = "snowflake.schematization.auto"; + public static final String SCHEMATIZATION_AUTO_DEFAULT = "true"; + public static final String SCHEMATIZATION_AUTO_DISPLAY = "Use automatic schema evolution"; + public static final String SCHEMATIZATION_AUTO_DOC = + "If true, use snowflake automatic schema evolution feature." + + "NOTE: you need to grant evolve schema to " + SNOWFLAKE_USER; + // Proxy Info private static final String PROXY_INFO = "Proxy Info"; public static final String JVM_PROXY_HOST = "jvm.proxy.host"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index dad2ec22b..77e7e3609 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -281,5 +281,5 @@ public interface SnowflakeConnectionService { * * @param tableName table name */ - void createTableWithOnlyMetadataColumn(String tableName); + void createTableWithOnlyMetadataColumn(String tableName, boolean autoSchematization); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index f5fbb5b9a..85d3e5bb9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -135,7 +135,7 @@ public void createTable(final String tableName) { } @Override - public void createTableWithOnlyMetadataColumn(final String tableName) { + public void createTableWithOnlyMetadataColumn(final String tableName, final boolean autoSchematization) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String createTableQuery = @@ -151,17 +151,19 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { throw SnowflakeErrors.ERROR_2007.getException(e); } - // Enable schema evolution by default if the table is created by the connector - String enableSchemaEvolutionQuery = - "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; - try { - PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); - stmt.setString(1, tableName); - stmt.executeQuery(); - } catch (SQLException e) { - // Skip the error given that schema evolution is still under PrPr - LOG_WARN_MSG( - "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + if (autoSchematization) { + // Enable schema evolution by default if the table is created by the connector + String enableSchemaEvolutionQuery = + "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; + try { + PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); + stmt.setString(1, tableName); + stmt.executeQuery(); + } catch (SQLException e) { + // Skip the error given that schema evolution is still under PrPr + LOG_WARN_MSG( + "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + } } LOG_INFO_MSG("Created table {} with only RECORD_METADATA column", tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index 763652f84..d2209c033 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -145,7 +145,7 @@ private static Map getSchemaMapFromRecord(SinkRecord record) { Schema schema = record.valueSchema(); if (schema != null) { for (Field field : schema.fields()) { - schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type())); + schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name())); } } return schemaMap; @@ -158,7 +158,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) { // only when the type of the value is unrecognizable for JAVA throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass()); } - return convertToSnowflakeType(schemaType); + return convertToSnowflakeType(schemaType, null); } /** Convert a json node type to kafka data type */ @@ -192,7 +192,7 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { } /** Convert the kafka data type to Snowflake data type */ - private static String convertToSnowflakeType(Type kafkaType) { + private static String convertToSnowflakeType(Type kafkaType, String semanticType) { switch (kafkaType) { case INT8: return "BYTEINT"; @@ -209,6 +209,9 @@ private static String convertToSnowflakeType(Type kafkaType) { case BOOLEAN: return "BOOLEAN"; case STRING: + if (semanticType != null && semanticType.equals("io.debezium.data.Json")) { + return "VARIANT"; + } return "VARCHAR"; case BYTES: return "BINARY"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 52fbbb2e8..ad1ea06d3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -101,6 +101,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private final String streamingIngestClientName; private boolean enableSchematization; + private boolean autoSchematization; /** * Key is formulated in {@link #partitionChannelKey(String, int)} } @@ -131,6 +132,8 @@ public SnowflakeSinkServiceV2( this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(this.connectorConfig); this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1"); this.streamingIngestClientName = @@ -516,7 +519,7 @@ private void createTableIfNotExists(final String tableName) { if (this.enableSchematization) { // Always create the table with RECORD_METADATA only and rely on schema evolution to update // the schema - this.conn.createTableWithOnlyMetadataColumn(tableName); + this.conn.createTableWithOnlyMetadataColumn(tableName, this.autoSchematization); } else { this.conn.createTable(tableName); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 626962f08..16c6690fb 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -175,6 +175,7 @@ public class TopicPartitionChannel { // Whether schematization has been enabled. private final boolean enableSchematization; + private final boolean autoSchematization; // Whether schema evolution could be done on this channel private final boolean enableSchemaEvolution; @@ -255,11 +256,14 @@ public TopicPartitionChannel( /* Schematization related properties */ this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(sfConnectorConfig); this.enableSchemaEvolution = this.enableSchematization && this.conn != null - && this.conn.hasSchemaEvolutionPermission( - tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE)); + && (!autoSchematization || + this.conn.hasSchemaEvolutionPermission( + tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE))); } /** diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index a1bb87d38..2d3bdbf9f 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -71,6 +71,7 @@ public class RecordService extends EnableLogging { static final String HEADERS = "headers"; private boolean enableSchematization = false; + private boolean autoSchematization = true; // For each task, we require a separate instance of SimpleDataFormat, since they are not // inherently thread safe @@ -120,6 +121,24 @@ public boolean setAndGetEnableSchematizationFromConfig( return this.enableSchematization; } +/** + * extract autoSchematization from the connector config and set the value for the recordService + * + *

The extracted boolean is returned for external usage. + * + * @param connectorConfig the connector config map + * @return a boolean indicating whether schematization is enabled + */ +public boolean setAndGetAutoSchematizationFromConfig( + final Map connectorConfig) { + if (connectorConfig.containsKey(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)) { + this.autoSchematization = + Boolean.parseBoolean( + connectorConfig.get(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)); + } + return this.autoSchematization; +} + /** * Directly set the enableSchematization through param * From c91c9b4ba5539c4ede1c744cc74ddf4b8553c23e Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Fri, 10 Feb 2023 11:16:09 +0000 Subject: [PATCH 02/10] fix date/time conversion in SchematizationUtils; fixed junit --- .../streaming/SchematizationUtils.java | 23 +++++++++++++++++++ .../streaming/SnowflakeSinkServiceV2IT.java | 6 ++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index d2209c033..ab587fd14 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -24,8 +24,13 @@ import java.util.Map; import javax.annotation.Nonnull; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -193,6 +198,24 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { /** Convert the kafka data type to Snowflake data type */ private static String convertToSnowflakeType(Type kafkaType, String semanticType) { + if (semanticType != null) { + switch (semanticType) { + case Decimal.LOGICAL_NAME: + return "DOUBLE"; + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + case "io.debezium.time.ZonedTimestamp": + case "io.debezium.time.ZonedTime": + case "io.debezium.time.MicroTime": + case "io.debezium.time.Timestamp": + case "io.debezium.time.MicroTimestamp": + return "TIMESTAMP"; + case Date.LOGICAL_NAME: + case "io.debezium.time.Date": + return "DATE"; + } + } + switch (kafkaType) { case INT8: return "BYTEINT"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index a218b51b4..3869d2b2b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -937,7 +937,7 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { avroConverter.configure( Collections.singletonMap("schema.registry.url", "http://fake-url"), false); byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); @@ -1023,7 +1023,7 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception { JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); @@ -1083,7 +1083,7 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); createNonNullableColumn(table, "id_int8_non_nullable"); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); From e42e169d197cbfec57713895c0d4034027e14d6e Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Fri, 10 Feb 2023 12:55:03 +0000 Subject: [PATCH 03/10] Added support for regex matcher in snowflake.topic2table.map --- .../SnowflakeSinkConnectorConfig.java | 5 +++- .../com/snowflake/kafka/connector/Utils.java | 29 ++++++++++++++----- .../snowflake/kafka/connector/UtilsTest.java | 8 +++++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 5918b2bb8..dac861030 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -337,7 +337,10 @@ static ConfigDef newConfigDef() { topicToTableValidator, Importance.LOW, "Map of topics to tables (optional). Format : comma-separated tuples, e.g." - + " :,:,... ", + + " :,:,... \n" + + "Generic regex matching is possible using the following syntax:" + + Utils.TOPIC_MATCHER_PREFIX + "^[^.]\\w+.\\w+.(.*):$1\n" + + "NOTE: topics names cannot contain \":\" or \",\" so the regex should not contain these characters either\n", CONNECTOR_CONFIG, 0, ConfigDef.Width.NONE, diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index f0b6398c3..86e1c5d32 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -504,6 +504,18 @@ public static String generateValidName(String topic, Map topic2t if (Utils.isValidSnowflakeObjectIdentifier(topic)) { return topic; } + + for (Map.Entry entry : topic2table.entrySet()) { + if (entry.getKey().startsWith(TOPIC_MATCHER_PREFIX)) { + String regex = entry.getKey().replaceFirst(TOPIC_MATCHER_PREFIX, ""); + String finalTableName = topic.replaceAll(regex, entry.getValue()); + if (Utils.isValidSnowflakeObjectIdentifier(finalTableName)) { + topic2table.put(topic, finalTableName); + return finalTableName; + } + } + } + int hash = Math.abs(topic.hashCode()); StringBuilder result = new StringBuilder(); @@ -531,6 +543,7 @@ public static String generateValidName(String topic, Map topic2t return result.toString(); } + public static String TOPIC_MATCHER_PREFIX = "REGEX_MATCHER>"; public static Map parseTopicToTableMap(String input) { Map topic2Table = new HashMap<>(); boolean isInvalid = false; @@ -546,13 +559,15 @@ public static Map parseTopicToTableMap(String input) { String topic = tt[0].trim(); String table = tt[1].trim(); - if (!isValidSnowflakeTableName(table)) { - LOGGER.error( - "table name {} should have at least 2 " - + "characters, start with _a-zA-Z, and only contains " - + "_$a-zA-z0-9", - table); - isInvalid = true; + if (!topic.startsWith(TOPIC_MATCHER_PREFIX)) { + if (!isValidSnowflakeTableName(table)) { + LOGGER.error( + "table name {} should have at least 2 " + + "characters, start with _a-zA-Z, and only contains " + + "_$a-zA-z0-9", + table); + isInvalid = true; + } } if (topic2Table.containsKey(topic)) { diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 6441ea9f0..8a143e7e8 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -71,6 +71,14 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void test_generateValidName() { + Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, REGEX_MATCHER>^[^.]\\w+.\\w+.(.*):$1, 1234:_1234"); + + assert Utils.tableName("postgres.public.testtbl", topic2table).equals("testtbl"); + } + + @Test public void testTableFullName() { assert Utils.isValidSnowflakeTableName("_1342dfsaf$"); From 4a68939e19b9aafc521d2de27da4a089a5c3e331 Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Mon, 13 Feb 2023 14:11:50 +0000 Subject: [PATCH 04/10] fix column ordering to match as much as possible the source column ordering --- .../streaming/SchematizationUtils.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index ab587fd14..f760574ad 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -19,9 +19,16 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.records.RecordService; + +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import javax.annotation.Nonnull; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; @@ -88,7 +95,15 @@ public static void evolveSchemaIfNeeded( // Add columns if needed, ignore any exceptions since other task might be succeeded if (extraColNames != null) { - Map extraColumnsToType = getColumnTypes(record, extraColNames); + List fieldNamesOrderedAsOnSource = Stream.concat( + record.keySchema().fields().stream().map(f -> f.name()), + record.valueSchema().fields().stream().map(f -> f.name()) + ).collect(Collectors.toList()); + List extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames); + extraColNamesOrderedAsOnSource.sort( + Comparator.comparingInt(fieldNamesOrderedAsOnSource::indexOf)); + Map extraColumnsToType = getColumnTypes(record, extraColNamesOrderedAsOnSource); + try { conn.appendColumnsToTable(tableName, extraColumnsToType); } catch (SnowflakeKafkaConnectorException e) { @@ -114,7 +129,7 @@ static Map getColumnTypes(SinkRecord record, List column if (columnNames == null) { return new HashMap<>(); } - Map columnToType = new HashMap<>(); + Map columnToType = new LinkedHashMap<>(); Map schemaMap = getSchemaMapFromRecord(record); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value()); From 1fceaef5ec67ee0628819963371a5819633be57c Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Mon, 6 Feb 2023 16:39:49 +0000 Subject: [PATCH 05/10] Added snowflake.schematization.auto; added postgresql JSON to VARIANT type convertion --- .../SnowflakeSinkConnectorConfig.java | 7 +++++ .../internal/SnowflakeConnectionService.java | 2 +- .../SnowflakeConnectionServiceV1.java | 26 ++++++++++--------- .../streaming/SchematizationUtils.java | 9 ++++--- .../streaming/SnowflakeSinkServiceV2.java | 5 +++- .../streaming/TopicPartitionChannel.java | 8 ++++-- .../connector/records/RecordService.java | 19 ++++++++++++++ 7 files changed, 57 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index eb9b22da8..f1a51de82 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -74,6 +74,13 @@ public class SnowflakeSinkConnectorConfig { public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization"; public static final String ENABLE_SCHEMATIZATION_DEFAULT = "false"; + public static final String SCHEMATIZATION_AUTO_CONFIG = "snowflake.schematization.auto"; + public static final String SCHEMATIZATION_AUTO_DEFAULT = "true"; + public static final String SCHEMATIZATION_AUTO_DISPLAY = "Use automatic schema evolution"; + public static final String SCHEMATIZATION_AUTO_DOC = + "If true, use snowflake automatic schema evolution feature." + + "NOTE: you need to grant evolve schema to " + SNOWFLAKE_USER; + // Proxy Info private static final String PROXY_INFO = "Proxy Info"; public static final String JVM_PROXY_HOST = "jvm.proxy.host"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index 10740231e..059c6b83e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -281,7 +281,7 @@ public interface SnowflakeConnectionService { * * @param tableName table name */ - void createTableWithOnlyMetadataColumn(String tableName); + void createTableWithOnlyMetadataColumn(String tableName, boolean autoSchematization); /** * Gets the task id for this snowflake connection diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index 058f1f24b..7d3301d79 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -135,7 +135,7 @@ public void createTable(final String tableName) { } @Override - public void createTableWithOnlyMetadataColumn(final String tableName) { + public void createTableWithOnlyMetadataColumn(final String tableName, final boolean autoSchematization) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String createTableQuery = @@ -151,17 +151,19 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { throw SnowflakeErrors.ERROR_2007.getException(e); } - // Enable schema evolution by default if the table is created by the connector - String enableSchemaEvolutionQuery = - "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; - try { - PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); - stmt.setString(1, tableName); - stmt.executeQuery(); - } catch (SQLException e) { - // Skip the error given that schema evolution is still under PrPr - LOG_WARN_MSG( - "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + if (autoSchematization) { + // Enable schema evolution by default if the table is created by the connector + String enableSchemaEvolutionQuery = + "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; + try { + PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); + stmt.setString(1, tableName); + stmt.executeQuery(); + } catch (SQLException e) { + // Skip the error given that schema evolution is still under PrPr + LOG_WARN_MSG( + "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + } } LOG_INFO_MSG("Created table {} with only RECORD_METADATA column", tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index 763652f84..d2209c033 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -145,7 +145,7 @@ private static Map getSchemaMapFromRecord(SinkRecord record) { Schema schema = record.valueSchema(); if (schema != null) { for (Field field : schema.fields()) { - schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type())); + schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name())); } } return schemaMap; @@ -158,7 +158,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) { // only when the type of the value is unrecognizable for JAVA throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass()); } - return convertToSnowflakeType(schemaType); + return convertToSnowflakeType(schemaType, null); } /** Convert a json node type to kafka data type */ @@ -192,7 +192,7 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { } /** Convert the kafka data type to Snowflake data type */ - private static String convertToSnowflakeType(Type kafkaType) { + private static String convertToSnowflakeType(Type kafkaType, String semanticType) { switch (kafkaType) { case INT8: return "BYTEINT"; @@ -209,6 +209,9 @@ private static String convertToSnowflakeType(Type kafkaType) { case BOOLEAN: return "BOOLEAN"; case STRING: + if (semanticType != null && semanticType.equals("io.debezium.data.Json")) { + return "VARIANT"; + } return "VARCHAR"; case BYTES: return "BINARY"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index df9def7d3..94738fe1f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -88,6 +88,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private final String taskId; private boolean enableSchematization; + private boolean autoSchematization; /** * Key is formulated in {@link #partitionChannelKey(String, int)} } @@ -118,6 +119,8 @@ public SnowflakeSinkServiceV2( this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(this.connectorConfig); this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1"); this.partitionsToChannel = new HashMap<>(); @@ -456,7 +459,7 @@ private void createTableIfNotExists(final String tableName) { if (this.enableSchematization) { // Always create the table with RECORD_METADATA only and rely on schema evolution to update // the schema - this.conn.createTableWithOnlyMetadataColumn(tableName); + this.conn.createTableWithOnlyMetadataColumn(tableName, this.autoSchematization); } else { this.conn.createTable(tableName); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index cc81f6aba..86ba1227f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -174,6 +174,7 @@ public class TopicPartitionChannel { // Whether schematization has been enabled. private final boolean enableSchematization; + private final boolean autoSchematization; // Whether schema evolution could be done on this channel private final boolean enableSchemaEvolution; @@ -233,11 +234,14 @@ public TopicPartitionChannel( /* Schematization related properties */ this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(sfConnectorConfig); this.enableSchemaEvolution = this.enableSchematization && this.conn != null - && this.conn.hasSchemaEvolutionPermission( - tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE)); + && (!autoSchematization || + this.conn.hasSchemaEvolutionPermission( + tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE))); } /** diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index a1bb87d38..2d3bdbf9f 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -71,6 +71,7 @@ public class RecordService extends EnableLogging { static final String HEADERS = "headers"; private boolean enableSchematization = false; + private boolean autoSchematization = true; // For each task, we require a separate instance of SimpleDataFormat, since they are not // inherently thread safe @@ -120,6 +121,24 @@ public boolean setAndGetEnableSchematizationFromConfig( return this.enableSchematization; } +/** + * extract autoSchematization from the connector config and set the value for the recordService + * + *

The extracted boolean is returned for external usage. + * + * @param connectorConfig the connector config map + * @return a boolean indicating whether schematization is enabled + */ +public boolean setAndGetAutoSchematizationFromConfig( + final Map connectorConfig) { + if (connectorConfig.containsKey(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)) { + this.autoSchematization = + Boolean.parseBoolean( + connectorConfig.get(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)); + } + return this.autoSchematization; +} + /** * Directly set the enableSchematization through param * From 4c03bd03973e530134270a2080922e5f5e50d232 Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Fri, 10 Feb 2023 11:16:09 +0000 Subject: [PATCH 06/10] fix date/time conversion in SchematizationUtils; fixed junit --- .../streaming/SchematizationUtils.java | 23 +++++++++++++++++++ .../streaming/SnowflakeSinkServiceV2IT.java | 6 ++--- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index d2209c033..ab587fd14 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -24,8 +24,13 @@ import java.util.Map; import javax.annotation.Nonnull; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -193,6 +198,24 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { /** Convert the kafka data type to Snowflake data type */ private static String convertToSnowflakeType(Type kafkaType, String semanticType) { + if (semanticType != null) { + switch (semanticType) { + case Decimal.LOGICAL_NAME: + return "DOUBLE"; + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + case "io.debezium.time.ZonedTimestamp": + case "io.debezium.time.ZonedTime": + case "io.debezium.time.MicroTime": + case "io.debezium.time.Timestamp": + case "io.debezium.time.MicroTimestamp": + return "TIMESTAMP"; + case Date.LOGICAL_NAME: + case "io.debezium.time.Date": + return "DATE"; + } + } + switch (kafkaType) { case INT8: return "BYTEINT"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 3028f2949..a43bede9d 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -950,7 +950,7 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { avroConverter.configure( Collections.singletonMap("schema.registry.url", "http://fake-url"), false); byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); @@ -1036,7 +1036,7 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception { JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); @@ -1096,7 +1096,7 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); createNonNullableColumn(table, "id_int8_non_nullable"); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); From e6e702ee1091cf88cb338853d242fd6f38706333 Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Fri, 10 Feb 2023 12:55:03 +0000 Subject: [PATCH 07/10] Added support for regex matcher in snowflake.topic2table.map --- .../SnowflakeSinkConnectorConfig.java | 5 +++- .../com/snowflake/kafka/connector/Utils.java | 29 ++++++++++++++----- .../snowflake/kafka/connector/UtilsTest.java | 8 +++++ 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index f1a51de82..1806e6cd4 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -348,7 +348,10 @@ static ConfigDef newConfigDef() { topicToTableValidator, Importance.LOW, "Map of topics to tables (optional). Format : comma-separated tuples, e.g." - + " :,:,... ", + + " :,:,... \n" + + "Generic regex matching is possible using the following syntax:" + + Utils.TOPIC_MATCHER_PREFIX + "^[^.]\\w+.\\w+.(.*):$1\n" + + "NOTE: topics names cannot contain \":\" or \",\" so the regex should not contain these characters either\n", CONNECTOR_CONFIG, 0, ConfigDef.Width.NONE, diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index 4720603d2..2ae48b045 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -521,6 +521,18 @@ public static String generateValidName(String topic, Map topic2t if (Utils.isValidSnowflakeObjectIdentifier(topic)) { return topic; } + + for (Map.Entry entry : topic2table.entrySet()) { + if (entry.getKey().startsWith(TOPIC_MATCHER_PREFIX)) { + String regex = entry.getKey().replaceFirst(TOPIC_MATCHER_PREFIX, ""); + String finalTableName = topic.replaceAll(regex, entry.getValue()); + if (Utils.isValidSnowflakeObjectIdentifier(finalTableName)) { + topic2table.put(topic, finalTableName); + return finalTableName; + } + } + } + int hash = Math.abs(topic.hashCode()); StringBuilder result = new StringBuilder(); @@ -548,6 +560,7 @@ public static String generateValidName(String topic, Map topic2t return result.toString(); } + public static String TOPIC_MATCHER_PREFIX = "REGEX_MATCHER>"; public static Map parseTopicToTableMap(String input) { Map topic2Table = new HashMap<>(); boolean isInvalid = false; @@ -563,13 +576,15 @@ public static Map parseTopicToTableMap(String input) { String topic = tt[0].trim(); String table = tt[1].trim(); - if (!isValidSnowflakeTableName(table)) { - LOGGER.error( - "table name {} should have at least 2 " - + "characters, start with _a-zA-Z, and only contains " - + "_$a-zA-z0-9", - table); - isInvalid = true; + if (!topic.startsWith(TOPIC_MATCHER_PREFIX)) { + if (!isValidSnowflakeTableName(table)) { + LOGGER.error( + "table name {} should have at least 2 " + + "characters, start with _a-zA-Z, and only contains " + + "_$a-zA-z0-9", + table); + isInvalid = true; + } } if (topic2Table.containsKey(topic)) { diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 6441ea9f0..8a143e7e8 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -71,6 +71,14 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void test_generateValidName() { + Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, REGEX_MATCHER>^[^.]\\w+.\\w+.(.*):$1, 1234:_1234"); + + assert Utils.tableName("postgres.public.testtbl", topic2table).equals("testtbl"); + } + + @Test public void testTableFullName() { assert Utils.isValidSnowflakeTableName("_1342dfsaf$"); From 7efd254ff8470210ca8c59d748809af9af1e08f2 Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Mon, 13 Feb 2023 14:11:50 +0000 Subject: [PATCH 08/10] fix column ordering to match as much as possible the source column ordering --- .../streaming/SchematizationUtils.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index ab587fd14..f760574ad 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -19,9 +19,16 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.records.RecordService; + +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import javax.annotation.Nonnull; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; @@ -88,7 +95,15 @@ public static void evolveSchemaIfNeeded( // Add columns if needed, ignore any exceptions since other task might be succeeded if (extraColNames != null) { - Map extraColumnsToType = getColumnTypes(record, extraColNames); + List fieldNamesOrderedAsOnSource = Stream.concat( + record.keySchema().fields().stream().map(f -> f.name()), + record.valueSchema().fields().stream().map(f -> f.name()) + ).collect(Collectors.toList()); + List extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames); + extraColNamesOrderedAsOnSource.sort( + Comparator.comparingInt(fieldNamesOrderedAsOnSource::indexOf)); + Map extraColumnsToType = getColumnTypes(record, extraColNamesOrderedAsOnSource); + try { conn.appendColumnsToTable(tableName, extraColumnsToType); } catch (SnowflakeKafkaConnectorException e) { @@ -114,7 +129,7 @@ static Map getColumnTypes(SinkRecord record, List column if (columnNames == null) { return new HashMap<>(); } - Map columnToType = new HashMap<>(); + Map columnToType = new LinkedHashMap<>(); Map schemaMap = getSchemaMapFromRecord(record); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value()); From f463138450b8e6236b848df44a29ea07c093b84b Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Wed, 1 Mar 2023 10:19:42 +0000 Subject: [PATCH 09/10] merge upstream/master --- .../connector/internal/SnowflakeConnectionService.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index 059c6b83e..77e7e3609 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -282,11 +282,4 @@ public interface SnowflakeConnectionService { * @param tableName table name */ void createTableWithOnlyMetadataColumn(String tableName, boolean autoSchematization); - - /** - * Gets the task id for this snowflake connection - * - * @return The task id for the snowflake connection - */ - int getTaskId(); } From 88ce343fa84cdc7cff66630fea10e6780059fe8f Mon Sep 17 00:00:00 2001 From: Alexandru Cristu Date: Mon, 27 Mar 2023 22:06:35 +0300 Subject: [PATCH 10/10] fix for NPE SinkRecord.keySchema, github #582 --- .../connector/internal/streaming/SchematizationUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index f760574ad..db53c8efa 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -96,8 +96,8 @@ public static void evolveSchemaIfNeeded( // Add columns if needed, ignore any exceptions since other task might be succeeded if (extraColNames != null) { List fieldNamesOrderedAsOnSource = Stream.concat( - record.keySchema().fields().stream().map(f -> f.name()), - record.valueSchema().fields().stream().map(f -> f.name()) + record.keySchema() != null ? record.keySchema().fields().stream().map(f -> f.name()) : Stream.empty(), + record.valueSchema() != null ? record.valueSchema().fields().stream().map(f -> f.name()) : Stream.empty() ).collect(Collectors.toList()); List extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames); extraColNamesOrderedAsOnSource.sort(