From bbd2878aff9c2bf5bf311fdea282834a26bff52e Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Sun, 3 Nov 2024 01:48:16 +0800 Subject: [PATCH] [PRODUCER] cleanup interdependent --- .../astraea/app/performance/Performance.java | 28 +------ .../app/performance/ProducerThread.java | 15 +--- .../app/performance/PerformanceTest.java | 6 +- .../common/partitioner/Partitioner.java | 82 ------------------- 4 files changed, 5 insertions(+), 126 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index d5620c024a..cc3a128468 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -17,7 +17,6 @@ package org.astraea.app.performance; import com.beust.jcommander.Parameter; -import com.beust.jcommander.ParameterException; import com.beust.jcommander.converters.LongConverter; import java.nio.file.Path; import java.time.Duration; @@ -44,7 +43,6 @@ import org.astraea.app.argument.NonNegativeShortField; import org.astraea.app.argument.PathField; import org.astraea.app.argument.PatternField; -import org.astraea.app.argument.PositiveIntegerField; import org.astraea.app.argument.PositiveIntegerListField; import org.astraea.app.argument.PositiveLongField; import org.astraea.app.argument.StringListField; @@ -63,7 +61,6 @@ import org.astraea.common.admin.TopicPartitionPath; import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; -import org.astraea.common.partitioner.Partitioner; import org.astraea.common.producer.Producer; import org.astraea.common.producer.ProducerConfigs; import org.astraea.common.producer.Record; @@ -90,8 +87,7 @@ public static List execute(final Argument param) { var latestOffsets = param.lastOffsets(); System.out.println("creating threads"); - var producerThreads = - ProducerThread.create(blockingQueues, param::createProducer, param.interdependent); + var producerThreads = ProducerThread.create(blockingQueues, param::createProducer); var consumerThreads = param.monkeys != null ? Collections.synchronizedList(new ArrayList<>(consumers(param, latestOffsets))) @@ -269,21 +265,6 @@ Map lastOffsets() { String partitioner = null; String partitioner() { - // The given partitioner should be Astraea Partitioner when interdependent is set - if (this.interdependent > 1) { - try { - if (this.partitioner == null - || !Partitioner.class.isAssignableFrom(Class.forName(this.partitioner))) { - throw new ParameterException( - "The given partitioner \"" - + this.partitioner - + "\" is not a subclass of Astraea Partitioner"); - } - } catch (ClassNotFoundException e) { - throw new ParameterException( - "The given partitioner \"" + this.partitioner + "\" was not found."); - } - } if (this.partitioner != null) { if (!this.specifyBrokers.isEmpty()) throw new IllegalArgumentException( @@ -534,13 +515,6 @@ else if (specifiedByBroker) { converter = DurationField.class) Duration readIdle = Duration.ofSeconds(2); - @Parameter( - names = {"--interdependent.size"}, - description = - "Integer: the number of records sending to the same partition (Note: this parameter only works for Astraea partitioner)", - validateWith = PositiveIntegerField.class) - int interdependent = 1; - @Parameter( names = {"--throttle"}, description = "Map: Set the topic-partitions and its' throttle data rate", diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index f9ac90af83..34b9f96eca 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -30,7 +30,6 @@ import org.astraea.common.metrics.MBeanRegister; import org.astraea.common.metrics.Sensor; import org.astraea.common.metrics.stats.Avg; -import org.astraea.common.partitioner.Partitioner; import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; @@ -46,8 +45,7 @@ public interface ProducerThread extends AbstractThread { static List create( List>>> queues, - Function> producerSupplier, - int interdependent) { + Function> producerSupplier) { var producers = queues.size(); if (producers <= 0) return List.of(); var closeLatches = @@ -88,11 +86,6 @@ static List create( var data = queue.poll(3, TimeUnit.SECONDS); - // Using interdependent - if (interdependent > 1 && data != null) { - Partitioner.beginInterdependent(producer); - interdependentCounter += data.size(); - } var now = System.currentTimeMillis(); if (data != null) producer @@ -105,12 +98,6 @@ static List create( sensor.record( (double) (System.currentTimeMillis() - now)); })); - - // End interdependent - if (interdependent > 1 && interdependentCounter >= interdependent) { - Partitioner.endInterdependent(producer); - interdependentCounter = 0; - } } } catch (InterruptedException e) { if (!queue.isEmpty()) diff --git a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java index d5e1579c8c..6288b3b8c5 100644 --- a/app/src/test/java/org/astraea/app/performance/PerformanceTest.java +++ b/app/src/test/java/org/astraea/app/performance/PerformanceTest.java @@ -56,7 +56,7 @@ void testTransactionalProducer() { "2" }; var argument = Argument.parse(new Performance.Argument(), arguments1); - try (var producer = argument.createProducer()) { + try (var producer = argument.createProducer("test-id")) { Assertions.assertTrue(producer.transactional()); } } @@ -66,7 +66,7 @@ void testProducerExecutor() { var topic = "testProducerExecutor"; String[] arguments1 = {"--bootstrap.servers", SERVICE.bootstrapServers(), "--topics", topic}; var argument = Argument.parse(new Performance.Argument(), arguments1); - try (var producer = argument.createProducer()) { + try (var producer = argument.createProducer("test-id")) { Assertions.assertFalse(producer.transactional()); } } @@ -444,7 +444,7 @@ void testLastOffsets() { new String[] { "--bootstrap.servers", SERVICE.bootstrapServers(), "--topics", topicName }); - try (var producer = args.createProducer()) { + try (var producer = args.createProducer("test-id")) { IntStream.range(0, 250) .forEach( i -> diff --git a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java index 57d8581e69..e6be56b6a2 100644 --- a/common/src/main/java/org/astraea/common/partitioner/Partitioner.java +++ b/common/src/main/java/org/astraea/common/partitioner/Partitioner.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; import org.astraea.common.Configuration; import org.astraea.common.Utils; @@ -32,9 +31,6 @@ public abstract class Partitioner implements org.apache.kafka.clients.producer.P public static final String COST_PREFIX = "partitioner.cost"; private static final Duration CLUSTER_INFO_LEASE = Duration.ofSeconds(15); - static final ThreadLocal THREAD_LOCAL = - ThreadLocal.withInitial(Interdependent::new); - private final AtomicLong lastUpdated = new AtomicLong(-1); volatile ClusterInfo clusterInfo = ClusterInfo.empty(); Admin admin = null; @@ -63,80 +59,6 @@ public void close() { Utils.close(admin); } - // -----------------------[interdependent]-----------------------// - - /** - * Use the producer to get the scheduler, allowing you to control it for interdependent - * messages.Interdependent message will be sent to the same partition. The system will - * automatically select the node with the best current condition as the target node. - * Action:Partitioner states can interfere with each other when multiple producers are in the same - * thread. Each Thread can only support one producer. For example: - * - *
{
-   * @Code
-   * Dispatch.startInterdependent(producer);
-   * producer.send();
-   * Dispatch.endInterdependent(producer);
-   * }
- * - * Begin interdependence function.Let the next messages be interdependent. - * - * @param producer Kafka producer - */ - // TODO One thread supports multiple producers. - public static void beginInterdependent( - org.apache.kafka.clients.producer.Producer producer) { - THREAD_LOCAL.get().isInterdependent = true; - } - - /** - * Use the producer to get the scheduler, allowing you to control it for interdependent - * messages.Interdependent message will be sent to the same partition. The system will - * automatically select the node with the best current condition as the target node. - * Action:Partitioner states can interfere with each other when multiple producers are in the same - * thread. Each Thread can only support one producer. For example: - * - *
{
-   * @Code
-   * Dispatch.startInterdependent(producer);
-   * producer.send();
-   * Dispatch.endInterdependent(producer);
-   * }
- * - * Begin interdependence function.Let the next messages be interdependent. - * - * @param producer Astraea producer - */ - // TODO One thread supports multiple producers. - // TODO: https://github.com/opensource4you/astraea/pull/721#discussion_r973677891 - public static void beginInterdependent(org.astraea.common.producer.Producer producer) { - beginInterdependent((Producer) Utils.member(producer, "kafkaProducer")); - } - - /** - * Close interdependence function.Send data using the original partitioner logic. - * - * @param producer Kafka producer - */ - public static void endInterdependent(org.apache.kafka.clients.producer.Producer producer) { - THREAD_LOCAL.remove(); - } - - /** - * Close interdependence function.Send data using the original partitioner logic. - * - * @param producer Kafka producer - */ - // TODO: https://github.com/opensource4you/astraea/pull/721#discussion_r973677891 - public static void endInterdependent(org.astraea.common.producer.Producer producer) { - endInterdependent((Producer) Utils.member(producer, "kafkaProducer")); - } - - private static class Interdependent { - boolean isInterdependent = false; - private int targetPartitions = -1; - } - // -----------------------[kafka method]-----------------------// @Override @@ -153,9 +75,6 @@ public final void configure(Map configs) { @Override public final int partition( String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - var interdependent = THREAD_LOCAL.get(); - if (interdependent.isInterdependent && interdependent.targetPartitions >= 0) - return interdependent.targetPartitions; tryToUpdate(); final int target; if (!clusterInfo.topicNames().contains(topic)) { @@ -163,7 +82,6 @@ public final int partition( var ps = cluster.availablePartitionsForTopic(topic); target = ps.isEmpty() ? 0 : ps.get((int) (Math.random() * ps.size())).partition(); } else target = partition(topic, keyBytes, valueBytes, clusterInfo); - interdependent.targetPartitions = target; return target; }