Skip to content

Commit

Permalink
las-sink: handle deleting event (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
daleiz authored Aug 22, 2024
1 parent 921428c commit 1817f48
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
2 changes: 2 additions & 0 deletions sink-las/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ dependencies {

// LAS
implementation 'com.bytedance.las:las-sdk-tunnel:1.2.0.16-public'

implementation 'com.google.guava:guava:33.3.0-jre'
}

task buildImages(type: Exec) {
Expand Down
21 changes: 14 additions & 7 deletions sink-las/app/src/main/java/sink/LasSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import com.google.common.base.Preconditions;



import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -144,18 +147,22 @@ GenericData.Record convertRecord(SinkRecord sinkRecord) {
var lasRecord = LasRecord.fromSinkRecord(sinkRecord);
// lasRecord should be like: {"key": {"id": 0, ...}, "value": {"id": 0, "name": "...", ...} } ,
// and this format would be compatible with debezium sources.
Map<String, Object> keyRecord = (Map<String,Object>)lasRecord.getRecord().get("key");
Map<String, Object> valueRecord = (Map<String,Object>)lasRecord.getRecord().get("value");
if(valueRecord == null) {
valueRecord= lasRecord.getRecord();
}
Preconditions.checkArgument( keyRecord != null, "input record format error, key field is missing.");
var r = new GenericData.Record(schema);
for (var entry : valueRecord.entrySet()) {
var value = entry.getValue();
putValue(r, schema, entry.getKey(), value);
if(valueRecord == null) {
// This represents a deleting row event from CDC source, for LAS sink, we set 'is_deleted' column to true.
putValue(r, schema, "is_deleted", true);
} else {
for (var entry : valueRecord.entrySet()) {
var value = entry.getValue();
putValue(r, schema, entry.getKey(), value);
}
}
return r;
} catch (Exception e) {
log.warn("convertRecord failed:" + e.getMessage());
log.warn("convertRecord failed: " + e.getMessage());
if (!ctx.getSinkSkipStrategy().trySkip(sinkRecord, e.getMessage())) {
throw new ConnectorExceptions.FailFastError(e.getMessage());
}
Expand Down

0 comments on commit 1817f48

Please sign in to comment.