From bd17db76a68900a40c2c15edaaa482c3a23fc951 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 24 Jun 2023 11:39:17 +0800 Subject: [PATCH 1/4] add support for gzip compression --- .../common/backup/RecordWriterBuilder.java | 14 +++++++++++--- .../org/astraea/connector/backup/Exporter.java | 16 +++++++++++++++- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index c084e7d1c..7e5070c0e 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -18,7 +18,6 @@ import com.google.protobuf.ByteString; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -119,8 +118,17 @@ public void close() { this.fs = outputStream; } - public RecordWriterBuilder compression() throws IOException { - this.fs = new GZIPOutputStream(this.fs); + public RecordWriterBuilder compression(String type) { + switch (type) { + case "gzip": + this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs)); + break; + case "none": + // do nothing. + break; + default: + throw new IllegalArgumentException("unsupported compression type: " + type); + } return this; } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 28980e2c7..037f55094 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -134,6 +134,15 @@ public class Exporter extends SinkConnector { + "backups of a particular topic or topic partition. it can be used in 2 ways: " + "'.offset.from' or '..offset.from'.") .build(); + + static String COMPRESSION_TYPE_DEFAULT = "none"; + static Definition COMPRESSION_TYPE_KEY = + Definition.builder() + .name("compression.type") + .type(Definition.Type.STRING) + .documentation("a value that can specify the compression type.") + .defaultValue(COMPRESSION_TYPE_DEFAULT) + .build(); private Configuration configs; @Override @@ -162,7 +171,8 @@ protected List definitions() { PATH_KEY, SIZE_KEY, OVERRIDE_KEY, - BUFFER_SIZE_KEY); + BUFFER_SIZE_KEY, + COMPRESSION_TYPE_KEY); } public static class Task extends SinkTask { @@ -183,6 +193,7 @@ public static class Task extends SinkTask { String path; DataSize size; long interval; + String compressionType; // a map of > private final Map> offsetForTopicPartition = new HashMap<>(); @@ -199,6 +210,7 @@ RecordWriter createRecordWriter(TopicPartition tp, long offset) { return RecordWriter.builder( fs.write( String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName))) + .compression(this.compressionType) .build(); } @@ -299,6 +311,8 @@ protected void init(Configuration configuration, SinkTaskContext context) { .orElse(BUFFER_SIZE_DEFAULT) .bytes(); this.taskContext = context; + this.compressionType = + configuration.string(COMPRESSION_TYPE_KEY.name()).orElse(COMPRESSION_TYPE_DEFAULT); // fetches key-value pairs from the configuration's variable matching the regular expression // '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based From 7e16969767cf738da3cc44510ec5199160720056 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Mon, 26 Jun 2023 15:45:57 +0800 Subject: [PATCH 2/4] add test for checking the file is compressed in gzip format. --- .../connector/backup/ExporterTest.java | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 6a2799005..60d94d5cd 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -950,4 +950,67 @@ void testIsValid() { Assertions.assertEquals(1, task.seekOffset.size()); } } + + @Test + void testCompression() { + try (var server = HdfsServer.local()) { + var fileSize = "500Byte"; + var topicName = Utils.randomString(10); + + var task = new Exporter.Task(); + var configs = + Map.of( + "fs.schema", + "hdfs", + "topics", + topicName, + "fs.hdfs.hostname", + String.valueOf(server.hostname()), + "fs.hdfs.port", + String.valueOf(server.port()), + "fs.hdfs.user", + String.valueOf(server.user()), + "path", + "/" + fileSize, + "size", + fileSize, + "fs.hdfs.override.dfs.client.use.datanode.hostname", + "true", + "compression.type", + "gzip"); + + task.start(configs); + + var records = + List.of( + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test value".getBytes()) + .partition(0) + .offset(0) + .timestamp(System.currentTimeMillis()) + .build(), + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test value".getBytes()) + .partition(0) + .offset(1) + .timestamp(System.currentTimeMillis()) + .build()); + + task.put(records); + + Utils.sleep(Duration.ofMillis(1000)); + + task.close(); + var fs = FileSystem.of("hdfs", new Configuration(configs)); + + var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0")); + + Assertions.assertArrayEquals( + new byte[] {(byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(2))); + } + } } From e00ce093829b074938dba89618d703dc6953fcb8 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Mon, 26 Jun 2023 15:58:30 +0800 Subject: [PATCH 3/4] fix test failed caused by not setting the compression type. --- .../test/java/org/astraea/connector/backup/ExporterTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 60d94d5cd..28e8d14de 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -777,6 +777,7 @@ void testCreateRecordWriter() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.interval = 1000; + task.compressionType = "none"; RecordWriter recordWriter = task.createRecordWriter(tp, offset); @@ -841,6 +842,7 @@ void testWriteRecords() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); + task.compressionType = "none"; task.size = DataSize.of("100MB"); task.bufferSize.reset(); task.recordsQueue.add( From 43c6eb02643f79d061b319ecafe86b9ce86f4df5 Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Sat, 8 Jul 2023 18:06:46 +0800 Subject: [PATCH 4/4] make the field compressionType in Task not case-sensitive --- .../src/main/java/org/astraea/connector/backup/Exporter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 037f55094..855320ec1 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -312,7 +312,10 @@ protected void init(Configuration configuration, SinkTaskContext context) { .bytes(); this.taskContext = context; this.compressionType = - configuration.string(COMPRESSION_TYPE_KEY.name()).orElse(COMPRESSION_TYPE_DEFAULT); + configuration + .string(COMPRESSION_TYPE_KEY.name()) + .orElse(COMPRESSION_TYPE_DEFAULT) + .toLowerCase(); // fetches key-value pairs from the configuration's variable matching the regular expression // '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based