Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPORTER] Add support for gzip compression #1826

Merged
merged 4 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.protobuf.ByteString;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -119,8 +118,17 @@ public void close() {
this.fs = outputStream;
}

public RecordWriterBuilder compression() throws IOException {
this.fs = new GZIPOutputStream(this.fs);
public RecordWriterBuilder compression(String type) {
switch (type) {
case "gzip":
this.fs = Utils.packException(() -> new GZIPOutputStream(this.fs));
break;
case "none":
// do nothing.
break;
default:
throw new IllegalArgumentException("unsupported compression type: " + type);
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ public class Exporter extends SinkConnector {
+ "backups of a particular topic or topic partition. it can be used in 2 ways: "
+ "'<topic>.offset.from' or '<topic>.<partition>.offset.from'.")
.build();

static String COMPRESSION_TYPE_DEFAULT = "none";
static Definition COMPRESSION_TYPE_KEY =
Definition.builder()
.name("compression.type")
.type(Definition.Type.STRING)
.documentation("a value that can specify the compression type.")
.defaultValue(COMPRESSION_TYPE_DEFAULT)
.build();
private Configuration configs;

@Override
Expand Down Expand Up @@ -162,7 +171,8 @@ protected List<Definition> definitions() {
PATH_KEY,
SIZE_KEY,
OVERRIDE_KEY,
BUFFER_SIZE_KEY);
BUFFER_SIZE_KEY,
COMPRESSION_TYPE_KEY);
}

public static class Task extends SinkTask {
Expand All @@ -183,6 +193,7 @@ public static class Task extends SinkTask {
String path;
DataSize size;
long interval;
String compressionType;

// a map of <Topic, <Partition, Offset>>
private final Map<String, Map<String, Long>> offsetForTopicPartition = new HashMap<>();
Expand All @@ -199,6 +210,7 @@ RecordWriter createRecordWriter(TopicPartition tp, long offset) {
return RecordWriter.builder(
fs.write(
String.join("/", path, tp.topic(), String.valueOf(tp.partition()), fileName)))
.compression(this.compressionType)
.build();
}

Expand Down Expand Up @@ -299,6 +311,11 @@ protected void init(Configuration configuration, SinkTaskContext context) {
.orElse(BUFFER_SIZE_DEFAULT)
.bytes();
this.taskContext = context;
this.compressionType =
configuration
.string(COMPRESSION_TYPE_KEY.name())
.orElse(COMPRESSION_TYPE_DEFAULT)
.toLowerCase();

// fetches key-value pairs from the configuration's variable matching the regular expression
// '.*offset.from', updates the values of 'offsetForTopic' or 'offsetForTopicPartition' based
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ void testCreateRecordWriter() {
var task = new Exporter.Task();
task.fs = FileSystem.of("hdfs", new Configuration(configs));
task.interval = 1000;
task.compressionType = "none";

RecordWriter recordWriter = task.createRecordWriter(tp, offset);

Expand Down Expand Up @@ -841,6 +842,7 @@ void testWriteRecords() {

var task = new Exporter.Task();
task.fs = FileSystem.of("hdfs", new Configuration(configs));
task.compressionType = "none";
task.size = DataSize.of("100MB");
task.bufferSize.reset();
task.recordsQueue.add(
Expand Down Expand Up @@ -950,4 +952,67 @@ void testIsValid() {
Assertions.assertEquals(1, task.seekOffset.size());
}
}

@Test
void testCompression() {
try (var server = HdfsServer.local()) {
var fileSize = "500Byte";
var topicName = Utils.randomString(10);

var task = new Exporter.Task();
var configs =
Map.of(
"fs.schema",
"hdfs",
"topics",
topicName,
"fs.hdfs.hostname",
String.valueOf(server.hostname()),
"fs.hdfs.port",
String.valueOf(server.port()),
"fs.hdfs.user",
String.valueOf(server.user()),
"path",
"/" + fileSize,
"size",
fileSize,
"fs.hdfs.override.dfs.client.use.datanode.hostname",
"true",
"compression.type",
"gzip");

task.start(configs);

var records =
List.of(
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test value".getBytes())
.partition(0)
.offset(0)
.timestamp(System.currentTimeMillis())
.build(),
Record.builder()
.topic(topicName)
.key("test".getBytes())
.value("test value".getBytes())
.partition(0)
.offset(1)
.timestamp(System.currentTimeMillis())
.build());

task.put(records);

Utils.sleep(Duration.ofMillis(1000));

task.close();
var fs = FileSystem.of("hdfs", new Configuration(configs));

var input = fs.read("/" + String.join("/", fileSize, topicName, "0/0"));

Assertions.assertArrayEquals(
new byte[] {(byte) 0x1f, (byte) 0x8b}, Utils.packException(() -> input.readNBytes(2)));
}
}
}