Skip to content

Commit

Permalink
Merge pull request #194 from marklogic/develop
Browse files Browse the repository at this point in the history
Merge develop into master for the 1.10.0 release
  • Loading branch information
BillFarber authored Nov 1, 2024
2 parents d29a1a0 + e1a3f3b commit 4a29ebe
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 171 deletions.
10 changes: 7 additions & 3 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ distribution.

### Requirements:
* MarkLogic Server 11+
* Java (either version 8, 11, or 17). It is recommended to use 11 or 17, as Confluent has deprecated Java 8 support in
Confluent 7.x and is removing it in Confluent 8.x. Additionally, Sonar requires the use of Java 11 or 17.
* Java, either version 11 or 17, is required to use the Gradle tools.
Additionally, SonarQube requires the use of Java 17.

See [the Confluent compatibility matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html#java)
for more information. After installing your desired version of Java, ensure that the `JAVA_HOME` environment variable
Expand Down Expand Up @@ -76,7 +76,7 @@ application must be deployed. From the "test-app" directory, follow these steps:

## Automated Testing
Now that your MarkLogic server is configured and the test-app is deployed, you can run the tests via from the root
directory:
directory. Note that you must be using Java 11 or Java 17 for this command due to the latest version of Gradle.
```
./gradlew test
```
Expand Down Expand Up @@ -318,3 +318,7 @@ project. You must have Ruby installed. Additionally, there seems to be a bug wit
The server needs to be run with Ruby 3.2.3, so you will need to run `chruby ruby-3.2.3` before starting the jekyll
server. To start the jekyll server, cd into the /docs directory and run the command `bundle exec jekyll server`.
This will start the server and the user documents will be available at http://127.0.0.1:4000/.

# Publishing the Connector to Confluent

Please refer to the internal Wiki page for information regarding the process for releasing the connector to the Confluent Hub.
29 changes: 13 additions & 16 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
plugins {
id 'java'
id 'net.saliman.properties' version '1.5.2'
id 'com.github.johnrengelman.shadow' version '8.1.1'
id "com.github.jk1.dependency-license-report" version "1.19"
id 'com.gradleup.shadow' version '8.3.4'

// Only used for testing
id 'com.marklogic.ml-gradle' version '4.8.0'
id 'com.marklogic.ml-gradle' version '5.0.0'
id 'jacoco'
id "org.sonarqube" version "4.4.1.3373"
id "org.sonarqube" version "5.1.0.4882"

// Used to generate Avro classes. This will write classes to build/generated-test-avro-java and also add that folder
// as a source root. Since this is commented out by default, the generated Avro test class has been added to
Expand All @@ -33,24 +32,21 @@ configurations {
ext {
// Even though Kafka Connect 3.7.0 is out, we're staying with 3.6.1 in order to continue
// using the third-party Kafka JUnit tool. See https://github.com/mguenther/kafka-junit?tab=readme-ov-file
kafkaVersion = "3.6.1"
kafkaVersion = "3.8.1"
}

dependencies {
compileOnly "org.apache.kafka:connect-api:${kafkaVersion}"
compileOnly "org.apache.kafka:connect-json:${kafkaVersion}"
compileOnly "org.apache.kafka:connect-runtime:${kafkaVersion}"
compileOnly "org.slf4j:slf4j-api:2.0.13"
compileOnly "org.slf4j:slf4j-api:1.7.36"

implementation 'com.marklogic:ml-javaclient-util:4.8.0'
// Force DHF to use the latest version of ml-app-deployer, which minimizes security vulnerabilities
implementation "com.marklogic:ml-app-deployer:4.8.0"
implementation "com.marklogic:ml-app-deployer:5.0.0"

implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.15.3"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:2.17.2"

// Note that in general, the version of the DHF jar must match that of the deployed DHF instance. Different versions
// may work together, but that behavior is not guaranteed.
implementation("com.marklogic:marklogic-data-hub:6.0.0") {
implementation("com.marklogic:marklogic-data-hub:6.1.1") {
exclude module: "marklogic-client-api"
exclude module: "ml-javaclient-util"
exclude module: "ml-app-deployer"
Expand All @@ -61,17 +57,18 @@ dependencies {
exclude module: "logback-classic"
}

testImplementation 'com.marklogic:marklogic-junit5:1.4.0'
testImplementation 'com.marklogic:marklogic-junit5:1.5.0'

testImplementation "org.apache.kafka:connect-api:${kafkaVersion}"
testImplementation "org.apache.kafka:connect-json:${kafkaVersion}"

// Can be deleted when the disabled kafka-junit tests are deleted.
testImplementation 'net.mguenther.kafka:kafka-junit:3.6.0'

testImplementation "org.apache.avro:avro-compiler:1.11.3"
testImplementation "org.apache.avro:avro-compiler:1.12.0"

// Forcing logback to be used for test logging
testImplementation "ch.qos.logback:logback-classic:1.3.14"
testImplementation "org.slf4j:jcl-over-slf4j:2.0.13"
testImplementation "org.slf4j:jcl-over-slf4j:2.0.16"

documentation files('LICENSE.txt')
documentation files('NOTICE.txt')
Expand Down
5 changes: 4 additions & 1 deletion docs/writing-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,13 +298,16 @@ sent to the DLQ.
MarkLogic then each of the records in the batch will be sent to the DLQ. The entire batch must be sent to the DLQ since
the connector is unable to determine the cause of the failure.

When a record is sent to the DLQ, the connector first adds headers to the record providing information about the cause
When a record is sent to the DLQ, the connector first adds headers to the record, providing information about the cause
of the failure in order to assist with troubleshooting and potential routing.
- "marklogic-failure-type" : Either "Write failure" or "Record conversion"
- "marklogic-exception-message" : Information from MarkLogic when there is a write failure
- "marklogic-original-topic" : The name of the topic that this record came from
- "marklogic-target-uri" : For write failures, this contains the target URI for the document

For those headers to be populated properly, the version of this connector must be compatible with the version of Kafka
that is being used. The 1.8.0 and 1.9.0 versions of the connector work with Kafka versions before 3.8. Starting with
the connector version 1.10.0, the Kafka version must be 3.8 or later.

## Sink connector error handling

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group=com.marklogic
version=1.9.0
version=1.10.0

# For the Confluent Connector Archive
componentOwner=marklogic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.io.IOException;
Expand All @@ -53,6 +55,8 @@
*/
public class WriteBatcherSinkTask extends AbstractSinkTask {

protected static final Logger classLogger = LoggerFactory.getLogger(WriteBatcherSinkTask.class);

private DatabaseClient databaseClient;
private DataMovementManager dataMovementManager;
private WriteBatcher writeBatcher;
Expand Down Expand Up @@ -101,20 +105,35 @@ protected void writeSinkRecord(SinkRecord sinkRecord) {

static void addFailureHeaders(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
if (sinkRecord instanceof InternalSinkRecord) {
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).originalRecord();
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(sinkRecord.topic()));
if (writeEvent != null) {
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
try {
ConsumerRecord<byte[], byte[]> originalRecord = ((InternalSinkRecord) sinkRecord).context().original();
addFailureHeadersToOriginalSinkRecord(originalRecord, e, failureHeaderValue, writeEvent);
} catch (NoSuchMethodError methodException) {
classLogger.warn("This version of the MarkLogic Kafka Connector requires Kafka version 3.8.0 or" +
" higher in order to store failure information on the original sink record. Instead, the failure" +
" information will be on the wrapper sink record.");
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
}
} else {
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
if (writeEvent != null) {
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
}
addFailureHeadersToNonInternalSinkRecord(sinkRecord, e, failureHeaderValue, writeEvent);
}
}

static void addFailureHeadersToNonInternalSinkRecord(SinkRecord sinkRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_FAILURE_HEADER, failureHeaderValue);
sinkRecord.headers().addString(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, e.getMessage());
sinkRecord.headers().addString(MARKLOGIC_ORIGINAL_TOPIC, sinkRecord.topic());
if (writeEvent != null) {
sinkRecord.headers().addString(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri());
}
}

static void addFailureHeadersToOriginalSinkRecord(ConsumerRecord<byte[], byte[]> originalRecord, Throwable e, String failureHeaderValue, WriteEvent writeEvent) {
originalRecord.headers().add(MARKLOGIC_MESSAGE_FAILURE_HEADER, getBytesHandleNull(failureHeaderValue));
originalRecord.headers().add(MARKLOGIC_MESSAGE_EXCEPTION_MESSAGE, getBytesHandleNull(e.getMessage()));
originalRecord.headers().add(MARKLOGIC_ORIGINAL_TOPIC, getBytesHandleNull(originalRecord.topic()));
if (writeEvent != null) {
originalRecord.headers().add(MARKLOGIC_TARGET_URI, writeEvent.getTargetUri().getBytes(StandardCharsets.UTF_8));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import net.mguenther.kafka.junit.ReadKeyValues;
import net.mguenther.kafka.junit.SendKeyValues;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.ArrayList;
Expand Down Expand Up @@ -68,6 +65,7 @@ void tearDownKafka() {
}

@Test
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
void failedBatchesShouldGoToTheDlq() throws InterruptedException {
sendSomeJsonMessages(NUM_RECORDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.avro.Schema;
import org.apache.avro.CanonicalSchemaFormatterFactory;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
Expand Down Expand Up @@ -83,7 +84,7 @@ void writeSchema() throws IOException {
.endRecord();

FileCopyUtils.copy(
mySchema.toString(true).getBytes(),
new CanonicalSchemaFormatterFactory().getDefaultFormatter().format(mySchema).getBytes(),
new File(Paths.get("src", "test", "avro").toFile(), "avroTestClass-schema.avsc")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import net.mguenther.kafka.junit.SendKeyValues;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -56,6 +57,7 @@ void tearDownKafka() {
}

@Test
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
Integer NUM_RECORDS = 2;
sendSomeJsonMessages(NUM_RECORDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import net.mguenther.kafka.junit.SendKeyValues;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
Expand Down Expand Up @@ -56,6 +57,7 @@ void tearDownKafka() {
}

@Test
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
Integer NUM_RECORDS = 2;
sendSomeJsonMessages(NUM_RECORDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.mguenther.kafka.junit.EmbeddedKafkaCluster;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.util.Properties;
Expand Down Expand Up @@ -48,6 +49,7 @@ void tearDownKafka() {

@SuppressWarnings("java:S2699") // The assertion happens via kafka.observe
@Test
@Disabled("This test is disabled because kafka-junit is not compatible with kafka > 3.6.0")
void shouldWaitForKeyedRecordsToBePublished() throws InterruptedException {
kafka.observe(on(AUTHORS_TOPIC, 15));
}
Expand Down
Loading

0 comments on commit 4a29ebe

Please sign in to comment.