diff --git a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java index 84f336b3a..8b9536f2f 100644 --- a/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java +++ b/src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java @@ -74,6 +74,13 @@ public class SnowflakeSinkConnectorConfig { public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization"; public static final String ENABLE_SCHEMATIZATION_DEFAULT = "false"; + public static final String SCHEMATIZATION_AUTO_CONFIG = "snowflake.schematization.auto"; + public static final String SCHEMATIZATION_AUTO_DEFAULT = "true"; + public static final String SCHEMATIZATION_AUTO_DISPLAY = "Use automatic schema evolution"; + public static final String SCHEMATIZATION_AUTO_DOC = + "If true, use snowflake automatic schema evolution feature." + + "NOTE: you need to grant evolve schema to " + SNOWFLAKE_USER; + // Proxy Info private static final String PROXY_INFO = "Proxy Info"; public static final String JVM_PROXY_HOST = "jvm.proxy.host"; @@ -334,7 +341,10 @@ static ConfigDef newConfigDef() { topicToTableValidator, Importance.LOW, "Map of topics to tables (optional). Format : comma-separated tuples, e.g." - + " :,:,... ", + + " :,:,... \n" + + "Generic regex matching is possible using the following syntax:" + + Utils.TOPIC_MATCHER_PREFIX + "^[^.]\\w+.\\w+.(.*):$1\n" + + "NOTE: topics names cannot contain \":\" or \",\" so the regex should not contain these characters either\n", CONNECTOR_CONFIG, 0, ConfigDef.Width.NONE, diff --git a/src/main/java/com/snowflake/kafka/connector/Utils.java b/src/main/java/com/snowflake/kafka/connector/Utils.java index a031af641..177260dc9 100644 --- a/src/main/java/com/snowflake/kafka/connector/Utils.java +++ b/src/main/java/com/snowflake/kafka/connector/Utils.java @@ -511,6 +511,18 @@ public static String generateValidName(String topic, Map topic2t if (Utils.isValidSnowflakeObjectIdentifier(topic)) { return topic; } + + for (Map.Entry entry : topic2table.entrySet()) { + if (entry.getKey().startsWith(TOPIC_MATCHER_PREFIX)) { + String regex = entry.getKey().replaceFirst(TOPIC_MATCHER_PREFIX, ""); + String finalTableName = topic.replaceAll(regex, entry.getValue()); + if (Utils.isValidSnowflakeObjectIdentifier(finalTableName)) { + topic2table.put(topic, finalTableName); + return finalTableName; + } + } + } + int hash = Math.abs(topic.hashCode()); StringBuilder result = new StringBuilder(); @@ -538,6 +550,7 @@ public static String generateValidName(String topic, Map topic2t return result.toString(); } + public static String TOPIC_MATCHER_PREFIX = "REGEX_MATCHER>"; public static Map parseTopicToTableMap(String input) { Map topic2Table = new HashMap<>(); boolean isInvalid = false; @@ -553,13 +566,15 @@ public static Map parseTopicToTableMap(String input) { String topic = tt[0].trim(); String table = tt[1].trim(); - if (!isValidSnowflakeTableName(table)) { - LOGGER.error( - "table name {} should have at least 2 " - + "characters, start with _a-zA-Z, and only contains " - + "_$a-zA-z0-9", - table); - isInvalid = true; + if (!topic.startsWith(TOPIC_MATCHER_PREFIX)) { + if (!isValidSnowflakeTableName(table)) { + LOGGER.error( + "table name {} should have at least 2 " + + "characters, start with _a-zA-Z, and only contains " + + "_$a-zA-z0-9", + table); + isInvalid = true; + } } if (topic2Table.containsKey(topic)) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java index dad2ec22b..77e7e3609 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionService.java @@ -281,5 +281,5 @@ public interface SnowflakeConnectionService { * * @param tableName table name */ - void createTableWithOnlyMetadataColumn(String tableName); + void createTableWithOnlyMetadataColumn(String tableName, boolean autoSchematization); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index f5fbb5b9a..85d3e5bb9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -135,7 +135,7 @@ public void createTable(final String tableName) { } @Override - public void createTableWithOnlyMetadataColumn(final String tableName) { + public void createTableWithOnlyMetadataColumn(final String tableName, final boolean autoSchematization) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String createTableQuery = @@ -151,17 +151,19 @@ public void createTableWithOnlyMetadataColumn(final String tableName) { throw SnowflakeErrors.ERROR_2007.getException(e); } - // Enable schema evolution by default if the table is created by the connector - String enableSchemaEvolutionQuery = - "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; - try { - PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); - stmt.setString(1, tableName); - stmt.executeQuery(); - } catch (SQLException e) { - // Skip the error given that schema evolution is still under PrPr - LOG_WARN_MSG( - "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + if (autoSchematization) { + // Enable schema evolution by default if the table is created by the connector + String enableSchemaEvolutionQuery = + "alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true"; + try { + PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery); + stmt.setString(1, tableName); + stmt.executeQuery(); + } catch (SQLException e) { + // Skip the error given that schema evolution is still under PrPr + LOG_WARN_MSG( + "Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage()); + } } LOG_INFO_MSG("Created table {} with only RECORD_METADATA column", tableName); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java index 763652f84..db53c8efa 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SchematizationUtils.java @@ -19,13 +19,25 @@ import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.records.RecordService; + +import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import javax.annotation.Nonnull; import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; + +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; @@ -83,7 +95,15 @@ public static void evolveSchemaIfNeeded( // Add columns if needed, ignore any exceptions since other task might be succeeded if (extraColNames != null) { - Map extraColumnsToType = getColumnTypes(record, extraColNames); + List fieldNamesOrderedAsOnSource = Stream.concat( + record.keySchema() != null ? record.keySchema().fields().stream().map(f -> f.name()) : Stream.empty(), + record.valueSchema() != null ? record.valueSchema().fields().stream().map(f -> f.name()) : Stream.empty() + ).collect(Collectors.toList()); + List extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames); + extraColNamesOrderedAsOnSource.sort( + Comparator.comparingInt(fieldNamesOrderedAsOnSource::indexOf)); + Map extraColumnsToType = getColumnTypes(record, extraColNamesOrderedAsOnSource); + try { conn.appendColumnsToTable(tableName, extraColumnsToType); } catch (SnowflakeKafkaConnectorException e) { @@ -109,7 +129,7 @@ static Map getColumnTypes(SinkRecord record, List column if (columnNames == null) { return new HashMap<>(); } - Map columnToType = new HashMap<>(); + Map columnToType = new LinkedHashMap<>(); Map schemaMap = getSchemaMapFromRecord(record); JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value()); @@ -145,7 +165,7 @@ private static Map getSchemaMapFromRecord(SinkRecord record) { Schema schema = record.valueSchema(); if (schema != null) { for (Field field : schema.fields()) { - schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type())); + schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name())); } } return schemaMap; @@ -158,7 +178,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) { // only when the type of the value is unrecognizable for JAVA throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass()); } - return convertToSnowflakeType(schemaType); + return convertToSnowflakeType(schemaType, null); } /** Convert a json node type to kafka data type */ @@ -192,7 +212,25 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) { } /** Convert the kafka data type to Snowflake data type */ - private static String convertToSnowflakeType(Type kafkaType) { + private static String convertToSnowflakeType(Type kafkaType, String semanticType) { + if (semanticType != null) { + switch (semanticType) { + case Decimal.LOGICAL_NAME: + return "DOUBLE"; + case Time.LOGICAL_NAME: + case Timestamp.LOGICAL_NAME: + case "io.debezium.time.ZonedTimestamp": + case "io.debezium.time.ZonedTime": + case "io.debezium.time.MicroTime": + case "io.debezium.time.Timestamp": + case "io.debezium.time.MicroTimestamp": + return "TIMESTAMP"; + case Date.LOGICAL_NAME: + case "io.debezium.time.Date": + return "DATE"; + } + } + switch (kafkaType) { case INT8: return "BYTEINT"; @@ -209,6 +247,9 @@ private static String convertToSnowflakeType(Type kafkaType) { case BOOLEAN: return "BOOLEAN"; case STRING: + if (semanticType != null && semanticType.equals("io.debezium.data.Json")) { + return "VARIANT"; + } return "VARCHAR"; case BYTES: return "BINARY"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 8038f67a9..88cb92a71 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -103,6 +103,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService { private final String streamingIngestClientName; private boolean enableSchematization; + private boolean autoSchematization; /** * Key is formulated in {@link #partitionChannelKey(String, int)} } @@ -133,6 +134,8 @@ public SnowflakeSinkServiceV2( this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(this.connectorConfig); this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1"); this.streamingIngestClientName = @@ -531,7 +534,7 @@ private void createTableIfNotExists(final String tableName) { if (this.enableSchematization) { // Always create the table with RECORD_METADATA only and rely on schema evolution to update // the schema - this.conn.createTableWithOnlyMetadataColumn(tableName); + this.conn.createTableWithOnlyMetadataColumn(tableName, this.autoSchematization); } else { this.conn.createTable(tableName); } diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 626962f08..16c6690fb 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -175,6 +175,7 @@ public class TopicPartitionChannel { // Whether schematization has been enabled. private final boolean enableSchematization; + private final boolean autoSchematization; // Whether schema evolution could be done on this channel private final boolean enableSchemaEvolution; @@ -255,11 +256,14 @@ public TopicPartitionChannel( /* Schematization related properties */ this.enableSchematization = this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig); + this.autoSchematization = + this.recordService.setAndGetAutoSchematizationFromConfig(sfConnectorConfig); this.enableSchemaEvolution = this.enableSchematization && this.conn != null - && this.conn.hasSchemaEvolutionPermission( - tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE)); + && (!autoSchematization || + this.conn.hasSchemaEvolutionPermission( + tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE))); } /** diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index a1bb87d38..2d3bdbf9f 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -71,6 +71,7 @@ public class RecordService extends EnableLogging { static final String HEADERS = "headers"; private boolean enableSchematization = false; + private boolean autoSchematization = true; // For each task, we require a separate instance of SimpleDataFormat, since they are not // inherently thread safe @@ -120,6 +121,24 @@ public boolean setAndGetEnableSchematizationFromConfig( return this.enableSchematization; } +/** + * extract autoSchematization from the connector config and set the value for the recordService + * + *

The extracted boolean is returned for external usage. + * + * @param connectorConfig the connector config map + * @return a boolean indicating whether schematization is enabled + */ +public boolean setAndGetAutoSchematizationFromConfig( + final Map connectorConfig) { + if (connectorConfig.containsKey(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)) { + this.autoSchematization = + Boolean.parseBoolean( + connectorConfig.get(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)); + } + return this.autoSchematization; +} + /** * Directly set the enableSchematization through param * diff --git a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java index 6441ea9f0..8a143e7e8 100644 --- a/src/test/java/com/snowflake/kafka/connector/UtilsTest.java +++ b/src/test/java/com/snowflake/kafka/connector/UtilsTest.java @@ -71,6 +71,14 @@ public void testTableName() { assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode())); } + @Test + public void test_generateValidName() { + Map topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, REGEX_MATCHER>^[^.]\\w+.\\w+.(.*):$1, 1234:_1234"); + + assert Utils.tableName("postgres.public.testtbl", topic2table).equals("testtbl"); + } + + @Test public void testTableFullName() { assert Utils.isValidSnowflakeTableName("_1342dfsaf$"); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index ac4615a98..2e97df39b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -938,7 +938,7 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception { avroConverter.configure( Collections.singletonMap("schema.registry.url", "http://fake-url"), false); byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); @@ -1024,7 +1024,7 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception { JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted); @@ -1084,7 +1084,7 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce JsonConverter jsonConverter = new JsonConverter(); jsonConverter.configure(config, false); byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); + conn.createTableWithOnlyMetadataColumn(table, true); createNonNullableColumn(table, "id_int8_non_nullable"); SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted);