Skip to content

Commit

Permalink
Use FakeStreamingClientHandler with real snowflake implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-akowalczyk committed Apr 5, 2024
1 parent f74b2b5 commit 41bf7a4
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.ENABLE_STREAMING_CLIENT_OPTIMIZATION_DEFAULT;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.internal.KCLogger;
import java.util.Map;
import java.util.function.Supplier;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.Caffeine;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.LoadingCache;
import net.snowflake.ingest.internal.com.github.benmanes.caffeine.cache.RemovalCause;
Expand All @@ -38,18 +40,29 @@
* node with equal {@link StreamingClientProperties} will use the same client
*/
public class StreamingClientProvider {
private static class StreamingClientProviderSingleton {
private static final StreamingClientProvider streamingClientProvider =
new StreamingClientProvider();
}
private static StreamingClientProvider streamingClientProvider = null;

private static Supplier<StreamingClientHandler> clientHandlerSupplier =
DirectStreamingClientHandler::new;

/**
* Gets the current streaming provider
*
* @return The streaming client provider
*/
public static StreamingClientProvider getStreamingClientProviderInstance() {
return StreamingClientProviderSingleton.streamingClientProvider;
if (streamingClientProvider == null) {
streamingClientProvider = new StreamingClientProvider(clientHandlerSupplier.get());
}

return streamingClientProvider;
}

public static void overrideStreamingClientHandler(StreamingClientHandler streamingClientHandler) {
Preconditions.checkState(
streamingClientProvider == null,
"StreamingClientProvider is already initialized and cannot be overridden.");
clientHandlerSupplier = () -> streamingClientHandler;
}

/**
Expand Down Expand Up @@ -92,8 +105,8 @@ public static StreamingClientProvider getStreamingClientProviderInstance() {
* When a client is evicted, the cache will try closing the client, however it is best to still
* call close client manually as eviction is executed lazily
*/
private StreamingClientProvider() {
this.streamingClientHandler = new DirectStreamingClientHandler();
public StreamingClientProvider(StreamingClientHandler streamingClientHandler) {
this.streamingClientHandler = streamingClientHandler;
this.registeredClients = buildLoadingCache(this.streamingClientHandler);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,5 @@
package com.snowflake.kafka.connector;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkConnector;
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
Expand All @@ -31,12 +14,23 @@
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkConnector;
import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ConnectClusterBaseIT {

EmbeddedConnectCluster connectCluster;
static final Integer TASK_NUMBER = 1;
static final Duration CONNECTOR_MAX_STARTUP_TIME = Duration.ofSeconds(20);

@BeforeAll
public void beforeAll() {
Expand Down Expand Up @@ -90,15 +84,14 @@ final Map<String, String> defaultProperties(String topicName, String connectorNa
return config;
}

final Callable<Boolean> isConnectorRunning(String connectorName) {
return () -> {
ConnectorStateInfo status = connectCluster.connectorStatus(connectorName);
return status != null
&& status.connector().state().equals(AbstractStatus.State.RUNNING.toString())
&& status.tasks().size() >= TASK_NUMBER
&& status.tasks().stream()
.allMatch(state -> state.state().equals(AbstractStatus.State.RUNNING.toString()));

};
final void waitForConnectorRunning(String connectorName) {
try {
connectCluster
.assertions()
.assertConnectorAndAtLeastNumTasksAreRunning(
connectorName, 1, "The connector did not start.");
} catch (InterruptedException e) {
throw new IllegalStateException("The connector is not running");
}
}
}
Original file line number Diff line number Diff line change
@@ -1,45 +1,45 @@
package com.snowflake.kafka.connector;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class ProduceConsumeMessageIT extends ConnectClusterBaseIT {

private static final String TEST_TOPIC = "kafka-int-test";
private static final String TEST_CONNECTOR_NAME = "test-connector";

@BeforeEach
void createConnector() {
connectCluster.kafka().createTopic(TEST_TOPIC);
connectCluster.configureConnector(TEST_CONNECTOR_NAME, defaultProperties(TEST_TOPIC, TEST_CONNECTOR_NAME));
await().timeout(CONNECTOR_MAX_STARTUP_TIME).until(isConnectorRunning(TEST_CONNECTOR_NAME));
}

@AfterEach
void deleteConnector() {
connectCluster.deleteConnector(TEST_CONNECTOR_NAME);
connectCluster.kafka().deleteTopic(TEST_TOPIC);
}

@Test
public void connectorShouldConsumeMessagesFromTopic() {
connectCluster.kafka().produce(TEST_TOPIC, "test1");
connectCluster.kafka().produce(TEST_TOPIC, "test2");

await()
.untilAsserted(
() -> {
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
assertThat(records).hasSize(2);
assertThat(records.stream().map(SinkRecord::value)).containsExactly("test1", "test2");
});
}
private static final String TEST_TOPIC = "kafka-int-test";
private static final String TEST_CONNECTOR_NAME = "test-connector";

@BeforeEach
void createConnector() {
connectCluster.kafka().createTopic(TEST_TOPIC);
connectCluster.configureConnector(
TEST_CONNECTOR_NAME, defaultProperties(TEST_TOPIC, TEST_CONNECTOR_NAME));
waitForConnectorRunning(TEST_CONNECTOR_NAME);
}

@AfterEach
void deleteConnector() {
connectCluster.deleteConnector(TEST_CONNECTOR_NAME);
connectCluster.kafka().deleteTopic(TEST_TOPIC);
}

@Test
public void connectorShouldConsumeMessagesFromTopic() {
connectCluster.kafka().produce(TEST_TOPIC, "test1");
connectCluster.kafka().produce(TEST_TOPIC, "test2");

await()
.untilAsserted(
() -> {
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
assertThat(records).hasSize(2);
assertThat(records.stream().map(SinkRecord::value)).containsExactly("test1", "test2");
});
}
}
134 changes: 72 additions & 62 deletions src/test/java/com/snowflake/kafka/connector/SmtIT.java
Original file line number Diff line number Diff line change
@@ -1,74 +1,84 @@
package com.snowflake.kafka.connector;

import com.snowflake.kafka.connector.fake.SnowflakeFakeSinkTask;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;

import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT;
import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.sink.SinkConnector.TOPICS_CONFIG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class SmtIT extends ConnectClusterBaseIT {

private static final String SMT_TOPIC = "smtTopic";
private static final String SMT_CONNECTOR = "smtConnector";

@BeforeAll
public void createConnector() {
connectCluster.kafka().createTopic(SMT_TOPIC);
connectCluster.configureConnector(SMT_CONNECTOR, smtProperties());
await().timeout(CONNECTOR_MAX_STARTUP_TIME).until(isConnectorRunning(SMT_CONNECTOR));
}

@AfterAll
public void deleteConnector() {
connectCluster.deleteConnector(SMT_CONNECTOR);
connectCluster.kafka().deleteTopic(SMT_TOPIC);
}

private Map<String, String> smtProperties() {
Map<String, String> config = defaultProperties(SMT_TOPIC, SMT_CONNECTOR);

config.put(NAME, SMT_CONNECTOR);
config.put(TOPICS_CONFIG, SMT_TOPIC);

config.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
config.put("value.converter.schemas.enable", "false");

config.put(TRANSFORMS_CONFIG, "extractField");
config.put("transforms.extractField.type", "org.apache.kafka.connect.transforms.ExtractField$Value");
config.put("transforms.extractField.field", "message");

return config;
}

@Test
void testIfSmtRetuningNullsIngestDataCorrectly() {
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(SMT_TOPIC, message));

await()
.untilAsserted(
() -> {
List<SinkRecord> records = SnowflakeFakeSinkTask.getRecords();
assertThat(records)
.hasSize(20)
.allMatch(r -> r.originalTopic().equals(SMT_TOPIC));
});
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.FakeStreamingClientHandler;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.streaming.StreamingClientProvider;
import java.time.Duration;
import java.util.Map;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

}
public class SmtIT extends ConnectClusterBaseIT {

private static final String SMT_TOPIC = "SMT_TOPIC";
private static final String SMT_CONNECTOR = "SMT_CONNECTOR";
private static final FakeStreamingClientHandler fakeStreamingClientHandler =
new FakeStreamingClientHandler();

@BeforeAll
public void createConnector() {
StreamingClientProvider.overrideStreamingClientHandler(fakeStreamingClientHandler);
connectCluster.kafka().createTopic(SMT_TOPIC);
connectCluster.configureConnector(SMT_CONNECTOR, smtProperties());
waitForConnectorRunning(SMT_CONNECTOR);
}

@AfterAll
public void deleteConnector() {
connectCluster.deleteConnector(SMT_CONNECTOR);
connectCluster.kafka().deleteTopic(SMT_TOPIC);
}

private Map<String, String> smtProperties() {
Map<String, String> config = TestUtils.getConf();

config.put(CONNECTOR_CLASS_CONFIG, SnowflakeSinkConnector.class.getName());
config.put(NAME, SMT_CONNECTOR);
config.put(TOPICS_CONFIG, SMT_TOPIC);
config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString());
config.put(Utils.SF_ROLE, "testrole_kafka");
config.put(BUFFER_FLUSH_TIME_SEC, "1");

config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString());
config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
config.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName());
config.put("value.converter.schemas.enable", "false");

config.put(TRANSFORMS_CONFIG, "extractField");
config.put(
"transforms.extractField.type", "org.apache.kafka.connect.transforms.ExtractField$Value");
config.put("transforms.extractField.field", "message");

return config;
}

@Test
void testIfSmtRetuningNullsIngestDataCorrectly() {
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(SMT_TOPIC, message));

await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(() -> assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(20));
}
}
Loading

0 comments on commit 41bf7a4

Please sign in to comment.