diff --git a/sink-las/app/build.gradle b/sink-las/app/build.gradle index 429496f..39c6580 100644 --- a/sink-las/app/build.gradle +++ b/sink-las/app/build.gradle @@ -34,6 +34,8 @@ dependencies { // LAS implementation 'com.bytedance.las:las-sdk-tunnel:1.2.0.16-public' + + implementation 'com.google.guava:guava:33.3.0-jre' } task buildImages(type: Exec) { diff --git a/sink-las/app/src/main/java/sink/LasSinkTask.java b/sink-las/app/src/main/java/sink/LasSinkTask.java index 0e7e59c..a593a0c 100644 --- a/sink-las/app/src/main/java/sink/LasSinkTask.java +++ b/sink-las/app/src/main/java/sink/LasSinkTask.java @@ -14,6 +14,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import com.google.common.base.Preconditions; + + import java.util.List; import java.util.Map; @@ -144,18 +147,22 @@ GenericData.Record convertRecord(SinkRecord sinkRecord) { var lasRecord = LasRecord.fromSinkRecord(sinkRecord); // lasRecord should be like: {"key": {"id": 0, ...}, "value": {"id": 0, "name": "...", ...} } , // and this format would be compatible with debezium sources. + Map keyRecord = (Map)lasRecord.getRecord().get("key"); Map valueRecord = (Map)lasRecord.getRecord().get("value"); - if(valueRecord == null) { - valueRecord= lasRecord.getRecord(); - } + Preconditions.checkArgument( keyRecord != null, "input record format error, key field is missing."); var r = new GenericData.Record(schema); - for (var entry : valueRecord.entrySet()) { - var value = entry.getValue(); - putValue(r, schema, entry.getKey(), value); + if(valueRecord == null) { + // This represents a deleting row event from CDC source, for LAS sink, we set 'is_deleted' column to true. + putValue(r, schema, "is_deleted", true); + } else { + for (var entry : valueRecord.entrySet()) { + var value = entry.getValue(); + putValue(r, schema, entry.getKey(), value); + } } return r; } catch (Exception e) { - log.warn("convertRecord failed:" + e.getMessage()); + log.warn("convertRecord failed: " + e.getMessage()); if (!ctx.getSinkSkipStrategy().trySkip(sinkRecord, e.getMessage())) { throw new ConnectorExceptions.FailFastError(e.getMessage()); }