Skip to content

Commit

Permalink
[PRODUCER] cleanup interdependent
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Nov 2, 2024
1 parent dbb0f77 commit bbd2878
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 126 deletions.
28 changes: 1 addition & 27 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.astraea.app.performance;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.converters.LongConverter;
import java.nio.file.Path;
import java.time.Duration;
Expand All @@ -44,7 +43,6 @@
import org.astraea.app.argument.NonNegativeShortField;
import org.astraea.app.argument.PathField;
import org.astraea.app.argument.PatternField;
import org.astraea.app.argument.PositiveIntegerField;
import org.astraea.app.argument.PositiveIntegerListField;
import org.astraea.app.argument.PositiveLongField;
import org.astraea.app.argument.StringListField;
Expand All @@ -63,7 +61,6 @@
import org.astraea.common.admin.TopicPartitionPath;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.partitioner.Partitioner;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.ProducerConfigs;
import org.astraea.common.producer.Record;
Expand All @@ -90,8 +87,7 @@ public static List<String> execute(final Argument param) {
var latestOffsets = param.lastOffsets();

System.out.println("creating threads");
var producerThreads =
ProducerThread.create(blockingQueues, param::createProducer, param.interdependent);
var producerThreads = ProducerThread.create(blockingQueues, param::createProducer);
var consumerThreads =
param.monkeys != null
? Collections.synchronizedList(new ArrayList<>(consumers(param, latestOffsets)))
Expand Down Expand Up @@ -269,21 +265,6 @@ Map<TopicPartition, Long> lastOffsets() {
String partitioner = null;

String partitioner() {
// The given partitioner should be Astraea Partitioner when interdependent is set
if (this.interdependent > 1) {
try {
if (this.partitioner == null
|| !Partitioner.class.isAssignableFrom(Class.forName(this.partitioner))) {
throw new ParameterException(
"The given partitioner \""
+ this.partitioner
+ "\" is not a subclass of Astraea Partitioner");
}
} catch (ClassNotFoundException e) {
throw new ParameterException(
"The given partitioner \"" + this.partitioner + "\" was not found.");
}
}
if (this.partitioner != null) {
if (!this.specifyBrokers.isEmpty())
throw new IllegalArgumentException(
Expand Down Expand Up @@ -534,13 +515,6 @@ else if (specifiedByBroker) {
converter = DurationField.class)
Duration readIdle = Duration.ofSeconds(2);

@Parameter(
names = {"--interdependent.size"},
description =
"Integer: the number of records sending to the same partition (Note: this parameter only works for Astraea partitioner)",
validateWith = PositiveIntegerField.class)
int interdependent = 1;

@Parameter(
names = {"--throttle"},
description = "Map<String, DataRate>: Set the topic-partitions and its' throttle data rate",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.astraea.common.metrics.MBeanRegister;
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.stats.Avg;
import org.astraea.common.partitioner.Partitioner;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.Record;

Expand All @@ -46,8 +45,7 @@ public interface ProducerThread extends AbstractThread {

static List<ProducerThread> create(
List<ArrayBlockingQueue<List<Record<byte[], byte[]>>>> queues,
Function<String, Producer<byte[], byte[]>> producerSupplier,
int interdependent) {
Function<String, Producer<byte[], byte[]>> producerSupplier) {
var producers = queues.size();
if (producers <= 0) return List.of();
var closeLatches =
Expand Down Expand Up @@ -88,11 +86,6 @@ static List<ProducerThread> create(

var data = queue.poll(3, TimeUnit.SECONDS);

// Using interdependent
if (interdependent > 1 && data != null) {
Partitioner.beginInterdependent(producer);
interdependentCounter += data.size();
}
var now = System.currentTimeMillis();
if (data != null)
producer
Expand All @@ -105,12 +98,6 @@ static List<ProducerThread> create(
sensor.record(
(double) (System.currentTimeMillis() - now));
}));

// End interdependent
if (interdependent > 1 && interdependentCounter >= interdependent) {
Partitioner.endInterdependent(producer);
interdependentCounter = 0;
}
}
} catch (InterruptedException e) {
if (!queue.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void testTransactionalProducer() {
"2"
};
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
try (var producer = argument.createProducer("test-id")) {
Assertions.assertTrue(producer.transactional());
}
}
Expand All @@ -66,7 +66,7 @@ void testProducerExecutor() {
var topic = "testProducerExecutor";
String[] arguments1 = {"--bootstrap.servers", SERVICE.bootstrapServers(), "--topics", topic};
var argument = Argument.parse(new Performance.Argument(), arguments1);
try (var producer = argument.createProducer()) {
try (var producer = argument.createProducer("test-id")) {
Assertions.assertFalse(producer.transactional());
}
}
Expand Down Expand Up @@ -444,7 +444,7 @@ void testLastOffsets() {
new String[] {
"--bootstrap.servers", SERVICE.bootstrapServers(), "--topics", topicName
});
try (var producer = args.createProducer()) {
try (var producer = args.createProducer("test-id")) {
IntStream.range(0, 250)
.forEach(
i ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
Expand All @@ -32,9 +31,6 @@ public abstract class Partitioner implements org.apache.kafka.clients.producer.P
public static final String COST_PREFIX = "partitioner.cost";
private static final Duration CLUSTER_INFO_LEASE = Duration.ofSeconds(15);

static final ThreadLocal<Interdependent> THREAD_LOCAL =
ThreadLocal.withInitial(Interdependent::new);

private final AtomicLong lastUpdated = new AtomicLong(-1);
volatile ClusterInfo clusterInfo = ClusterInfo.empty();
Admin admin = null;
Expand Down Expand Up @@ -63,80 +59,6 @@ public void close() {
Utils.close(admin);
}

// -----------------------[interdependent]-----------------------//

/**
* Use the producer to get the scheduler, allowing you to control it for interdependent
* messages.Interdependent message will be sent to the same partition. The system will
* automatically select the node with the best current condition as the target node.
* Action:Partitioner states can interfere with each other when multiple producers are in the same
* thread. Each Thread can only support one producer. For example:
*
* <pre>{
* @Code
* Dispatch.startInterdependent(producer);
* producer.send();
* Dispatch.endInterdependent(producer);
* }</pre>
*
* Begin interdependence function.Let the next messages be interdependent.
*
* @param producer Kafka producer
*/
// TODO One thread supports multiple producers.
public static void beginInterdependent(
org.apache.kafka.clients.producer.Producer<?, ?> producer) {
THREAD_LOCAL.get().isInterdependent = true;
}

/**
* Use the producer to get the scheduler, allowing you to control it for interdependent
* messages.Interdependent message will be sent to the same partition. The system will
* automatically select the node with the best current condition as the target node.
* Action:Partitioner states can interfere with each other when multiple producers are in the same
* thread. Each Thread can only support one producer. For example:
*
* <pre>{
* @Code
* Dispatch.startInterdependent(producer);
* producer.send();
* Dispatch.endInterdependent(producer);
* }</pre>
*
* Begin interdependence function.Let the next messages be interdependent.
*
* @param producer Astraea producer
*/
// TODO One thread supports multiple producers.
// TODO: https://github.com/opensource4you/astraea/pull/721#discussion_r973677891
public static void beginInterdependent(org.astraea.common.producer.Producer<?, ?> producer) {
beginInterdependent((Producer<?, ?>) Utils.member(producer, "kafkaProducer"));
}

/**
* Close interdependence function.Send data using the original partitioner logic.
*
* @param producer Kafka producer
*/
public static void endInterdependent(org.apache.kafka.clients.producer.Producer<?, ?> producer) {
THREAD_LOCAL.remove();
}

/**
* Close interdependence function.Send data using the original partitioner logic.
*
* @param producer Kafka producer
*/
// TODO: https://github.com/opensource4you/astraea/pull/721#discussion_r973677891
public static void endInterdependent(org.astraea.common.producer.Producer<?, ?> producer) {
endInterdependent((Producer<?, ?>) Utils.member(producer, "kafkaProducer"));
}

private static class Interdependent {
boolean isInterdependent = false;
private int targetPartitions = -1;
}

// -----------------------[kafka method]-----------------------//

@Override
Expand All @@ -153,17 +75,13 @@ public final void configure(Map<String, ?> configs) {
@Override
public final int partition(
String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
var interdependent = THREAD_LOCAL.get();
if (interdependent.isInterdependent && interdependent.targetPartitions >= 0)
return interdependent.targetPartitions;
tryToUpdate();
final int target;
if (!clusterInfo.topicNames().contains(topic)) {
// the cached cluster info is not updated, so we just return a random partition
var ps = cluster.availablePartitionsForTopic(topic);
target = ps.isEmpty() ? 0 : ps.get((int) (Math.random() * ps.size())).partition();
} else target = partition(topic, keyBytes, valueBytes, clusterInfo);
interdependent.targetPartitions = target;
return target;
}

Expand Down

0 comments on commit bbd2878

Please sign in to comment.