Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1055561: Check whether SMT returning null values no longer stops a data ingestion. #816

Merged
merged 8 commits into from
Apr 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* node with equal {@link StreamingClientProperties} will use the same client
*/
public class StreamingClientProvider {
private static StreamingClientProvider streamingClientProvider = null;
private static volatile StreamingClientProvider streamingClientProvider = null;

private static Supplier<StreamingClientHandler> clientHandlerSupplier =
DirectStreamingClientHandler::new;
Expand All @@ -52,12 +52,36 @@ public class StreamingClientProvider {
*/
public static StreamingClientProvider getStreamingClientProviderInstance() {
if (streamingClientProvider == null) {
streamingClientProvider = new StreamingClientProvider(clientHandlerSupplier.get());
synchronized (StreamingClientProvider.class) {
if (streamingClientProvider == null) {
streamingClientProvider = new StreamingClientProvider(clientHandlerSupplier.get());
}
}
}

return streamingClientProvider;
}

public static void reset() {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
if (streamingClientProvider != null || clientHandlerSupplier != null) {
synchronized (StreamingClientProvider.class) {
if (streamingClientProvider != null || clientHandlerSupplier != null) {
streamingClientProvider = null;
clientHandlerSupplier = null;
}
}
}
}

/***
* The method allows for providing custom {@link StreamingClientHandler} to be used by the connector
* instead of the default that is {@link DirectStreamingClientHandler}
*
* This method is currently used by the test code only.
*
* @param streamingClientHandler The handler that will be used by the connector.
*/
@VisibleForTesting
public static void overrideStreamingClientHandler(StreamingClientHandler streamingClientHandler) {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkState(
streamingClientProvider == null,
Expand Down Expand Up @@ -105,7 +129,7 @@ public static void overrideStreamingClientHandler(StreamingClientHandler streami
* When a client is evicted, the cache will try closing the client, however it is best to still
* call close client manually as eviction is executed lazily
*/
public StreamingClientProvider(StreamingClientHandler streamingClientHandler) {
private StreamingClientProvider(StreamingClientHandler streamingClientHandler) {
this.streamingClientHandler = streamingClientHandler;
this.registeredClients = buildLoadingCache(this.streamingClientHandler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_DATABASE;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_PRIVATE_KEY;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_SCHEMA;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_URL;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWFLAKE_USER;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkConnector;
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import java.util.HashMap;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.FakeStreamingClientHandler;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider;
import java.util.Map;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -30,6 +25,9 @@
class ConnectClusterBaseIT {

EmbeddedConnectCluster connectCluster;

FakeStreamingClientHandler fakeStreamingClientHandler;

static final Integer TASK_NUMBER = 1;

@BeforeAll
Expand All @@ -43,8 +41,10 @@ public void beforeAll() {
}

@BeforeEach
public void before() {
SnowflakeFakeSinkTask.resetRecords();
public void beforeEach() {
StreamingClientProvider.reset();
fakeStreamingClientHandler = new FakeStreamingClientHandler();
StreamingClientProvider.overrideStreamingClientHandler(fakeStreamingClientHandler);
}

@AfterAll
Expand All @@ -55,32 +55,19 @@ public void afterAll() {
}
}

@AfterEach
public void after() {
SnowflakeFakeSinkTask.resetRecords();
}

final Map<String, String> defaultProperties(String topicName, String connectorName) {
Map<String, String> config = new HashMap<>();
Map<String, String> config = TestUtils.getConf();

// kafka connect specific
// real connector will be specified with SNOW-1055561
config.put(CONNECTOR_CLASS_CONFIG, SnowflakeFakeSinkConnector.class.getName());
config.put(CONNECTOR_CLASS_CONFIG, SnowflakeSinkConnector.class.getName());
config.put(NAME, connectorName);
config.put(TOPICS_CONFIG, topicName);
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "testrole_kafka");
config.put(BUFFER_FLUSH_TIME_SEC, "1");
config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());

// kafka push specific
config.put(NAME, connectorName);
config.put(SNOWFLAKE_URL, "https://test.testregion.snowflakecomputing.com:443");
config.put(SNOWFLAKE_USER, "testName");
config.put(SNOWFLAKE_PRIVATE_KEY, "testPrivateKey");
config.put(SNOWFLAKE_DATABASE, "testDbName");
config.put(SNOWFLAKE_SCHEMA, "testSchema");
config.put(BUFFER_COUNT_RECORDS, "1000000");
config.put(BUFFER_FLUSH_TIME_SEC, "1");

return config;
}

Expand Down

This file was deleted.

91 changes: 38 additions & 53 deletions src/test/java/com/snowflake/kafka/connector/SmtIT.java
Original file line number Diff line number Diff line change
@@ -1,66 +1,28 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.FakeStreamingClientHandler;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider;
import java.time.Duration;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class SmtIT extends ConnectClusterBaseIT {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we setup logging levels for this test? We are flooded with TRACE right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any particular solution in mind?


private static final String SMT_TOPIC = "SMT_TOPIC";
private static final String SMT_CONNECTOR = "SMT_CONNECTOR";
private static final FakeStreamingClientHandler fakeStreamingClientHandler =
new FakeStreamingClientHandler();
private Map<String, String> smtProperties(
String smtTopic, String smtConnector, String behaviorOnNull) {
Map<String, String> config = defaultProperties(smtTopic, smtConnector);

@BeforeAll
public void createConnector() {
StreamingClientProvider.overrideStreamingClientHandler(fakeStreamingClientHandler);
connectCluster.kafka().createTopic(SMT_TOPIC);
connectCluster.configureConnector(SMT_CONNECTOR, smtProperties());
waitForConnectorRunning(SMT_CONNECTOR);
}

@AfterAll
public void deleteConnector() {
connectCluster.deleteConnector(SMT_CONNECTOR);
connectCluster.kafka().deleteTopic(SMT_TOPIC);
}

private Map<String, String> smtProperties() {
Map<String, String> config = TestUtils.getConf();

config.put(CONNECTOR_CLASS_CONFIG, SnowflakeSinkConnector.class.getName());
config.put(NAME, SMT_CONNECTOR);
config.put(TOPICS_CONFIG, SMT_TOPIC);
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "testrole_kafka");
config.put(BUFFER_FLUSH_TIME_SEC, "1");

config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
config.put("value.converter.schemas.enable", "false");
config.put("behavior.on.null.values", behaviorOnNull);

config.put(TRANSFORMS_CONFIG, "extractField");
config.put(
Expand All @@ -70,15 +32,38 @@ private Map<String, String> smtProperties() {
return config;
}

@Test
void testIfSmtRetuningNullsIngestDataCorrectly() {
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(SMT_TOPIC, message));
@ParameterizedTest
@CsvSource({"DEFAULT, 20", "IGNORE, 10"})
void testIfSmtReturningNullsIngestDataCorrectly(String behaviorOnNull, int expectedRecordNumber) {
String smtTopic = TestUtils.randomTableName();
String smtConnector = String.format("%s_connector", smtTopic);

try {
// given
connectCluster.kafka().createTopic(smtTopic);
connectCluster.configureConnector(
smtConnector, smtProperties(smtTopic, smtConnector, behaviorOnNull));
waitForConnectorRunning(smtConnector);

// when
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(smtTopic, message));

await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(() -> assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(20));
// then
await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(
() -> {
assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(expectedRecordNumber);
assertThat(fakeStreamingClientHandler.getLatestCommittedOffsetTokensPerChannel())
.hasSize(1)
.containsValue("19");
});
} finally {
connectCluster.kafka().deleteTopic(smtTopic);
connectCluster.deleteConnector(smtConnector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can create a topic and generate a connector name as well as clean them up in @BeforeEach/@AfterEach.

}
}
}

This file was deleted.

Loading
Loading