From 246551dd5b793f8e66767fae2bde66a0b3c4feaf Mon Sep 17 00:00:00 2001 From: Jia-Sheng Chen Date: Wed, 17 May 2023 19:50:08 +0800 Subject: [PATCH] rewrite sink connector by java 17 toList --- .../src/main/java/org/astraea/connector/SinkConnector.java | 5 +---- connector/src/main/java/org/astraea/connector/SinkTask.java | 3 +-- .../src/main/java/org/astraea/connector/backup/Exporter.java | 3 +-- .../src/main/java/org/astraea/connector/perf/PerfSink.java | 3 +-- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/connector/src/main/java/org/astraea/connector/SinkConnector.java b/connector/src/main/java/org/astraea/connector/SinkConnector.java index e5c30be7c8..15b76dbe9d 100644 --- a/connector/src/main/java/org/astraea/connector/SinkConnector.java +++ b/connector/src/main/java/org/astraea/connector/SinkConnector.java @@ -18,7 +18,6 @@ import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; import org.astraea.common.Configuration; import org.astraea.common.VersionUtils; @@ -53,9 +52,7 @@ public final Class taskClass() @Override public List> taskConfigs(int maxTasks) { - return takeConfiguration(maxTasks).stream() - .map(Configuration::raw) - .collect(Collectors.toList()); + return takeConfiguration(maxTasks).stream().map(Configuration::raw).toList(); } @Override diff --git a/connector/src/main/java/org/astraea/connector/SinkTask.java b/connector/src/main/java/org/astraea/connector/SinkTask.java index 9d83dc4578..711d7d9a1c 100644 --- a/connector/src/main/java/org/astraea/connector/SinkTask.java +++ b/connector/src/main/java/org/astraea/connector/SinkTask.java @@ -21,7 +21,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.header.Headers; @@ -57,7 +56,7 @@ public final void start(Map props) { @Override public final void put(Collection records) { if (records != null && !records.isEmpty()) - put(records.stream().map(SinkTask::toRecord).collect(Collectors.toList())); + put(records.stream().map(SinkTask::toRecord).toList()); } private static byte[] toBytes(Schema schema, Object value) { diff --git a/connector/src/main/java/org/astraea/connector/backup/Exporter.java b/connector/src/main/java/org/astraea/connector/backup/Exporter.java index 776dd15a1a..6d4128d633 100644 --- a/connector/src/main/java/org/astraea/connector/backup/Exporter.java +++ b/connector/src/main/java/org/astraea/connector/backup/Exporter.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.Configuration; @@ -128,7 +127,7 @@ protected Class task() { @Override protected List takeConfiguration(int maxTasks) { - return IntStream.range(0, maxTasks).mapToObj(ignored -> configs).collect(Collectors.toList()); + return IntStream.range(0, maxTasks).mapToObj(ignored -> configs).toList(); } @Override diff --git a/connector/src/main/java/org/astraea/connector/perf/PerfSink.java b/connector/src/main/java/org/astraea/connector/perf/PerfSink.java index 88603f25af..6ad2760ed6 100644 --- a/connector/src/main/java/org/astraea/connector/perf/PerfSink.java +++ b/connector/src/main/java/org/astraea/connector/perf/PerfSink.java @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Configuration; import org.astraea.common.Utils; @@ -50,7 +49,7 @@ protected Class task() { @Override protected List takeConfiguration(int maxTasks) { - return IntStream.range(0, maxTasks).mapToObj(i -> config).collect(Collectors.toList()); + return IntStream.range(0, maxTasks).mapToObj(i -> config).toList(); } @Override