diff --git a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java index c084e7d1c..7e5070c0e 100644 --- a/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java +++ b/common/src/main/java/org/astraea/common/backup/RecordWriterBuilder.java @@ -18,7 +18,6 @@ import com.google.protobuf.ByteString; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -119,8 +118,17 @@ public void close() { this.fs = outputStream; } - public RecordWriterBuilder compression() throws IOException { - this.fs = new GZIPOutputStream(this.fs); + public RecordWriterBuilder compression(String type) { + switch (type) { + case "gzip": + this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs)); + break; + case "none": + // do nothing. + break; + default: + throw new IllegalArgumentException("unsupported compression type: " + type); + } return this; } diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 28980e2c7..855320ec1 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -134,6 +134,15 @@ public class Exporter extends SinkConnector { + "backups of a particular topic or topic partition. it can be used in 2 ways: " + "'.offset.from' or '..offset.from'.") .build(); + + static String COMPRESSION_TYPE_DEFAULT = "none"; + static Definition COMPRESSION_TYPE_KEY = + Definition.builder() + .name("compression.type") + .type(Definition.Type.STRING) + .documentation("a value that can specify the compression type.") + .defaultValue(COMPRESSION_TYPE_DEFAULT) + .build(); private Configuration configs; @Override @@ -162,7 +171,8 @@ protected List definitions() { PATH_KEY, SIZE_KEY, OVERRIDE_KEY, - BUFFER_SIZE_KEY); + BUFFER_SIZE_KEY, + COMPRESSION_TYPE_KEY); } public static class Task extends SinkTask { @@ -183,6 +193,7 @@ public static class Task extends SinkTask { String path; DataSize size; long interval; + String compressionType; // a map of > private final Map> offsetForTopicPartition = new HashMap<>(); @@ -199,6 +210,7 @@ RecordWriter createRecordWriter(TopicPartition tp, long offset) { return RecordWriter.builder( fs.write( String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName))) + .compression(this.compressionType) .build(); } @@ -299,6 +311,11 @@ protected void init(Configuration configuration, SinkTaskContext context) { .orElse(BUFFER_SIZE_DEFAULT) .bytes(); this.taskContext = context; + this.compressionType = + configuration + .string(COMPRESSION_TYPE_KEY.name()) + .orElse(COMPRESSION_TYPE_DEFAULT) + .toLowerCase(); // fetches key-value pairs from the configuration's variable matching the regular expression // '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based diff --git a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java index 6a2799005..28e8d14de 100644 --- a/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java +++ b/connector/src/test/java/org/astraea/connector/backup/ExporterTest.java @@ -777,6 +777,7 @@ void testCreateRecordWriter() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); task.interval = 1000; + task.compressionType = "none"; RecordWriter recordWriter = task.createRecordWriter(tp, offset); @@ -841,6 +842,7 @@ void testWriteRecords() { var task = new Exporter.Task(); task.fs = FileSystem.of("hdfs", new Configuration(configs)); + task.compressionType = "none"; task.size = DataSize.of("100MB"); task.bufferSize.reset(); task.recordsQueue.add( @@ -950,4 +952,67 @@ void testIsValid() { Assertions.assertEquals(1, task.seekOffset.size()); } } + + @Test + void testCompression() { + try (var server = HdfsServer.local()) { + var fileSize = "500Byte"; + var topicName = Utils.randomString(10); + + var task = new Exporter.Task(); + var configs = + Map.of( + "fs.schema", + "hdfs", + "topics", + topicName, + "fs.hdfs.hostname", + String.valueOf(server.hostname()), + "fs.hdfs.port", + String.valueOf(server.port()), + "fs.hdfs.user", + String.valueOf(server.user()), + "path", + "/" + fileSize, + "size", + fileSize, + "fs.hdfs.override.dfs.client.use.datanode.hostname", + "true", + "compression.type", + "gzip"); + + task.start(configs); + + var records = + List.of( + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test value".getBytes()) + .partition(0) + .offset(0) + .timestamp(System.currentTimeMillis()) + .build(), + Record.builder() + .topic(topicName) + .key("test".getBytes()) + .value("test value".getBytes()) + .partition(0) + .offset(1) + .timestamp(System.currentTimeMillis()) + .build()); + + task.put(records); + + Utils.sleep(Duration.ofMillis(1000)); + + task.close(); + var fs = FileSystem.of("hdfs", new Configuration(configs)); + + var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0")); + + Assertions.assertArrayEquals( + new byte[] {(byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(2))); + } + } }