diff --git a/sink-las/app/src/main/java/sink/LasSinkTask.java b/sink-las/app/src/main/java/sink/LasSinkTask.java index 89b47fa..f5d2abb 100644 --- a/sink-las/app/src/main/java/sink/LasSinkTask.java +++ b/sink-las/app/src/main/java/sink/LasSinkTask.java @@ -102,6 +102,49 @@ void handle(SinkRecordBatch batch) { session.commit(List.of(0L), List.of(recordWriter.getAttemptId())); } + void putValue(GenericData.Record record, Schema schema, String field, Object value) { + Schema valueSchema = schema.getField(field).schema(); + switch (valueSchema.getType()) { + case INT: + int intValue = (int) value; + record.put(field, intValue); + break; + case LONG: + long longValue = (long) value; + record.put(field, longValue); + break; + case FLOAT: + float floatValue = (float) value; + record.put(field, floatValue); + break; + case DOUBLE: + double doubleValue = (double) value; + record.put(field, doubleValue); + case UNION: + // by default, a field in LAS table is nullable, ant its schema is a union like [type, null]. + List unionTypes = valueSchema.getTypes(); + if(unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.INT))) { + int unionIntValue = (int) value; + record.put(field, unionIntValue); + break; + } else if (unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.LONG))) { + long unionLongValue = (long) value; + record.put(field, unionLongValue); + break; + } else if (unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.FLOAT))) { + float unionFloatValue = (float) value; + record.put(field, unionFloatValue); + break; + } else if (unionTypes.stream().anyMatch(it -> it.getType().equals(Schema.Type.DOUBLE))) { + double unionDoubleValue = (double) value; + record.put(field, unionDoubleValue); + break; + } + default: + record.put(field, value); + + } + } GenericData.Record convertRecord(SinkRecord sinkRecord) { try { var lasRecord = LasRecord.fromSinkRecord(sinkRecord); @@ -114,12 +157,7 @@ GenericData.Record convertRecord(SinkRecord sinkRecord) { var r = new GenericData.Record(schema); for (var entry : valueRecord.entrySet()) { var value = entry.getValue(); - if (schema.getField(entry.getKey()).schema().getType().equals(Schema.Type.INT)) { - var intValue = (int) value; - r.put(entry.getKey(), intValue); - } else { - r.put(entry.getKey(), value); - } + putValue(r, schema, entry.getKey(), value); } return r; } catch (Exception e) {