Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1055561: Check whether SMT returning null values no longer stops a data ingestion. #816

Merged
merged 8 commits into from
Apr 12, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.Map;
import java.util.function.Supplier;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.Caffeine;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.LoadingCache;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.RemovalCause;
Expand All @@ -38,18 +40,29 @@
* node with equal {@link StreamingClientProperties} will use the same client
*/
public class StreamingClientProvider {
private static class StreamingClientProviderSingleton {
private static final StreamingClientProvider streamingClientProvider =
new StreamingClientProvider();
}
private static StreamingClientProvider streamingClientProvider = null;

private static Supplier<StreamingClientHandler> clientHandlerSupplier =
DirectStreamingClientHandler::new;

/**
* Gets the current streaming provider
*
* @return The streaming client provider
*/
public static StreamingClientProvider getStreamingClientProviderInstance() {
return StreamingClientProviderSingleton.streamingClientProvider;
if (streamingClientProvider == null) {
streamingClientProvider = new StreamingClientProvider(clientHandlerSupplier.get());
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
}

return streamingClientProvider;
}

public static void overrideStreamingClientHandler(StreamingClientHandler streamingClientHandler) {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
Preconditions.checkState(
streamingClientProvider == null,
"StreamingClientProvider is already initialized and cannot be overridden.");
clientHandlerSupplier = () -> streamingClientHandler;
}

/**
Expand Down Expand Up @@ -92,8 +105,8 @@ public static StreamingClientProvider getStreamingClientProviderInstance() {
* When a client is evicted, the cache will try closing the client, however it is best to still
* call close client manually as eviction is executed lazily
*/
private StreamingClientProvider() {
this.streamingClientHandler = new DirectStreamingClientHandler();
public StreamingClientProvider(StreamingClientHandler streamingClientHandler) {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
this.streamingClientHandler = streamingClientHandler;
this.registeredClients = buildLoadingCache(this.streamingClientHandler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,24 @@
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkConnector;
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConnectClusterBaseIT {
class ConnectClusterBaseIT {

protected EmbeddedConnectCluster connectCluster;

protected static final String TEST_TOPIC = "kafka-int-test";
protected static final String TEST_CONNECTOR_NAME = "test-connector";
protected static final Integer TASK_NUMBER = 1;
private static final Duration CONNECTOR_MAX_STARTUP_TIME = Duration.ofSeconds(20);
EmbeddedConnectCluster connectCluster;
static final Integer TASK_NUMBER = 1;

@BeforeAll
public void beforeAll() {
Expand All @@ -52,9 +40,6 @@ public void beforeAll() {
.numWorkers(3)
.build();
connectCluster.start();
connectCluster.kafka().createTopic(TEST_TOPIC);
connectCluster.configureConnector(TEST_CONNECTOR_NAME, createProperties());
await().timeout(CONNECTOR_MAX_STARTUP_TIME).until(this::isConnectorRunning);
}

@BeforeEach
Expand All @@ -75,33 +60,19 @@ public void after() {
SnowflakeFakeSinkTask.resetRecords();
}

@Test
public void connectorShouldConsumeMessagesFromTopic() {
connectCluster.kafka().produce(TEST_TOPIC, "test1");
connectCluster.kafka().produce(TEST_TOPIC, "test2");

await()
.untilAsserted(
() -> {
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
assertThat(records).hasSize(2);
assertThat(records.stream().map(SinkRecord::value)).containsExactly("test1", "test2");
});
}

protected Map<String, String> createProperties() {
final Map<String, String> defaultProperties(String topicName, String connectorName) {
Map<String, String> config = new HashMap<>();

// kafka connect specific
// real connector will be specified with SNOW-1055561
config.put(CONNECTOR_CLASS_CONFIG, SnowflakeFakeSinkConnector.class.getName());
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
config.put(TOPICS_CONFIG, TEST_TOPIC);
config.put(TOPICS_CONFIG, topicName);
config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());

// kafka push specific
config.put(NAME, TEST_CONNECTOR_NAME);
config.put(NAME, connectorName);
config.put(SNOWFLAKE_URL, "https://test.testregion.snowflakecomputing.com:443");
config.put(SNOWFLAKE_USER, "testName");
config.put(SNOWFLAKE_PRIVATE_KEY, "testPrivateKey");
Expand All @@ -113,12 +84,14 @@ protected Map<String, String> createProperties() {
return config;
}

private boolean isConnectorRunning() {
ConnectorStateInfo status = connectCluster.connectorStatus(TEST_CONNECTOR_NAME);
return status != null
&& status.connector().state().equals(AbstractStatus.State.RUNNING.toString())
&& status.tasks().size() >= TASK_NUMBER
&& status.tasks().stream()
.allMatch(state -> state.state().equals(AbstractStatus.State.RUNNING.toString()));
final void waitForConnectorRunning(String connectorName) {
try {
connectCluster
.assertions()
.assertConnectorAndAtLeastNumTasksAreRunning(
connectorName, 1, "The connector did not start.");
} catch (InterruptedException e) {
throw new IllegalStateException("The connector is not running");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.snowflake.kafka.connector;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ProduceConsumeMessageIT extends ConnectClusterBaseIT {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved

private static final String TEST_TOPIC = "kafka-int-test";
private static final String TEST_CONNECTOR_NAME = "test-connector";

@BeforeEach
void createConnector() {
connectCluster.kafka().createTopic(TEST_TOPIC);
connectCluster.configureConnector(
TEST_CONNECTOR_NAME, defaultProperties(TEST_TOPIC, TEST_CONNECTOR_NAME));
waitForConnectorRunning(TEST_CONNECTOR_NAME);
}

@AfterEach
void deleteConnector() {
connectCluster.deleteConnector(TEST_CONNECTOR_NAME);
connectCluster.kafka().deleteTopic(TEST_TOPIC);
}

@Test
public void connectorShouldConsumeMessagesFromTopic() {
connectCluster.kafka().produce(TEST_TOPIC, "test1");
connectCluster.kafka().produce(TEST_TOPIC, "test2");

await()
.untilAsserted(
() -> {
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
assertThat(records).hasSize(2);
assertThat(records.stream().map(SinkRecord::value)).containsExactly("test1", "test2");
});
}
}
84 changes: 84 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/SmtIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.snowflake.kafka.connector;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.FakeStreamingClientHandler;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider;
import java.time.Duration;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class SmtIT extends ConnectClusterBaseIT {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we setup logging levels for this test? We are flooded with TRACE right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any particular solution in mind?


private static final String SMT_TOPIC = "SMT_TOPIC";
private static final String SMT_CONNECTOR = "SMT_CONNECTOR";
private static final FakeStreamingClientHandler fakeStreamingClientHandler =
new FakeStreamingClientHandler();

@BeforeAll
public void createConnector() {
StreamingClientProvider.overrideStreamingClientHandler(fakeStreamingClientHandler);
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
connectCluster.kafka().createTopic(SMT_TOPIC);
connectCluster.configureConnector(SMT_CONNECTOR, smtProperties());
waitForConnectorRunning(SMT_CONNECTOR);
}

@AfterAll
public void deleteConnector() {
connectCluster.deleteConnector(SMT_CONNECTOR);
connectCluster.kafka().deleteTopic(SMT_TOPIC);
}

private Map<String, String> smtProperties() {
Map<String, String> config = TestUtils.getConf();

config.put(CONNECTOR_CLASS_CONFIG, SnowflakeSinkConnector.class.getName());
config.put(NAME, SMT_CONNECTOR);
config.put(TOPICS_CONFIG, SMT_TOPIC);
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "testrole_kafka");
config.put(BUFFER_FLUSH_TIME_SEC, "1");

config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
config.put("value.converter.schemas.enable", "false");

config.put(TRANSFORMS_CONFIG, "extractField");
config.put(
"transforms.extractField.type", "org.apache.kafka.connect.transforms.ExtractField$Value");
config.put("transforms.extractField.field", "message");

return config;
}

@Test
void testIfSmtRetuningNullsIngestDataCorrectly() {
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(SMT_TOPIC, message));

await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(() -> assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(20));
sfc-gh-akowalczyk marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
package com.snowflake.kafka.connector.internal.streaming;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.FakeSnowflakeStreamingIngestClient;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;

public class FakeStreamingClientHandler implements StreamingClientHandler {

private AtomicInteger createClientCalls = new AtomicInteger(0);
private AtomicInteger closeClientCalls = new AtomicInteger(0);
private final ConcurrentLinkedQueue<FakeSnowflakeStreamingIngestClient> clients =
new ConcurrentLinkedQueue<>();
private final AtomicInteger createClientCalls = new AtomicInteger(0);
private final AtomicInteger closeClientCalls = new AtomicInteger(0);

@Override
public SnowflakeStreamingIngestClient createClient(
StreamingClientProperties streamingClientProperties) {
createClientCalls.incrementAndGet();
return new FakeSnowflakeStreamingIngestClient(
streamingClientProperties.clientName + "_" + UUID.randomUUID());
FakeSnowflakeStreamingIngestClient ingestClient =
new FakeSnowflakeStreamingIngestClient(
streamingClientProperties.clientName + "_" + UUID.randomUUID());
clients.add(ingestClient);
return ingestClient;
}

@Override
Expand All @@ -35,4 +45,11 @@ public Integer getCreateClientCalls() {
public Integer getCloseClientCalls() {
return closeClientCalls.get();
}

public Set<Map<String, Object>> ingestedRows() {
return clients.stream()
.map(FakeSnowflakeStreamingIngestClient::ingestedRecords)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.snowflake.ingest.streaming;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -154,4 +155,8 @@ public String getLatestCommittedOffsetToken() {
public Map<String, ColumnProperties> getTableSchema() {
throw new UnsupportedOperationException("Method is unsupported in fake communication channel");
}

List<Map<String, Object>> getRows() {
return ImmutableList.copyOf(this.rows);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package net.snowflake.ingest.streaming;

import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -18,7 +20,7 @@ public class FakeSnowflakeStreamingIngestClient implements SnowflakeStreamingIng
private boolean closed;
private static final KCLogger LOGGER =
new KCLogger(FakeSnowflakeStreamingIngestClient.class.getName());
private final ConcurrentHashMap<String, SnowflakeStreamingIngestChannel> channelCache =
private final ConcurrentHashMap<String, FakeSnowflakeStreamingIngestChannel> channelCache =
new ConcurrentHashMap<>();

public FakeSnowflakeStreamingIngestClient(String name) {
Expand Down Expand Up @@ -74,4 +76,11 @@ public Map<String, String> getLatestCommittedOffsetTokens(
public void close() throws Exception {
closed = true;
}

public Set<Map<String, Object>> ingestedRecords() {
return channelCache.values().stream()
.map(FakeSnowflakeStreamingIngestChannel::getRows)
.flatMap(Collection::stream)
.collect(Collectors.toSet());
}
}
Loading