Skip to content

Commit

Permalink
Some refactoring of IT
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-akowalczyk committed Apr 11, 2024
1 parent f6496dd commit 6f5dbc4
Showing 1 changed file with 37 additions and 28 deletions.
65 changes: 37 additions & 28 deletions src/test/java/com/snowflake/kafka/connector/SmtIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,29 @@
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

public class SmtIT extends ConnectClusterBaseIT {

private String smtTopic;
private String smtConnector;

@BeforeEach
void before() {
smtTopic = TestUtils.randomTableName();
smtConnector = String.format("%s_connector", smtTopic);
connectCluster.kafka().createTopic(smtTopic);
}

@AfterEach
void after() {
connectCluster.kafka().deleteTopic(smtTopic);
connectCluster.deleteConnector(smtConnector);
}

private Map<String, String> smtProperties(
String smtTopic, String smtConnector, String behaviorOnNull) {
Map<String, String> config = defaultProperties(smtTopic, smtConnector);
Expand All @@ -35,35 +53,26 @@ private Map<String, String> smtProperties(
@ParameterizedTest
@CsvSource({"DEFAULT, 20", "IGNORE, 10"})
void testIfSmtReturningNullsIngestDataCorrectly(String behaviorOnNull, int expectedRecordNumber) {
String smtTopic = TestUtils.randomTableName();
String smtConnector = String.format("%s_connector", smtTopic);

try {
// given
connectCluster.kafka().createTopic(smtTopic);
connectCluster.configureConnector(
smtConnector, smtProperties(smtTopic, smtConnector, behaviorOnNull));
waitForConnectorRunning(smtConnector);
// given
connectCluster.configureConnector(
smtConnector, smtProperties(smtTopic, smtConnector, behaviorOnNull));
waitForConnectorRunning(smtConnector);

// when
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(smtTopic, message));
// when
Stream.iterate(0, UnaryOperator.identity())
.limit(10)
.flatMap(v -> Stream.of("{}", "{\"message\":\"value\"}"))
.forEach(message -> connectCluster.kafka().produce(smtTopic, message));

// then
await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(
() -> {
assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(expectedRecordNumber);
assertThat(fakeStreamingClientHandler.getLatestCommittedOffsetTokensPerChannel())
.hasSize(1)
.containsValue("19");
});
} finally {
connectCluster.kafka().deleteTopic(smtTopic);
connectCluster.deleteConnector(smtConnector);
}
// then
await()
.timeout(Duration.ofSeconds(60))
.untilAsserted(
() -> {
assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(expectedRecordNumber);
assertThat(fakeStreamingClientHandler.getLatestCommittedOffsetTokensPerChannel())
.hasSize(1)
.containsValue("19");
});
}
}

0 comments on commit 6f5dbc4

Please sign in to comment.