diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index 3cd21143e..d4d655140 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -22,9 +22,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -38,7 +35,7 @@ import org.astraea.common.metrics.Sensor; import org.astraea.common.metrics.stats.Avg; -public interface ConsumerThread extends AbstractThread { +interface ConsumerThread extends AbstractThread { String DOMAIN_NAME = "org.astraea"; String TYPE_PROPERTY = "type"; String TYPE_VALUE = "consumer"; @@ -72,26 +69,12 @@ static List create( BiFunction> consumerSupplier) { if (consumers == 0) return List.of(); - var closeLatches = - IntStream.range(0, consumers).mapToObj(ignored -> new CountDownLatch(1)).toList(); - var executors = Executors.newFixedThreadPool(consumers); - // monitor - CompletableFuture.runAsync( - () -> { - try { - closeLatches.forEach(l -> Utils.swallowException(l::await)); - } finally { - executors.shutdown(); - Utils.swallowException(() -> executors.awaitTermination(30, TimeUnit.SECONDS)); - } - }); return IntStream.range(0, consumers) .mapToObj( index -> { var clientId = Performance.CLIENT_ID_PREFIX + "-consumer-" + index; var consumer = consumerSupplier.apply(clientId, new PartitionRatioListener(clientId)); var closed = new AtomicBoolean(false); - var closeLatch = closeLatches.get(index); var subscribed = new AtomicBoolean(true); var sensor = Sensor.builder() @@ -109,43 +92,43 @@ static List create( Double.class, () -> sensor.measure(EXP_WEIGHT_BY_TIME_PROPERTY)) .register(); - executors.execute( - () -> { - try { - while (!closed.get()) { - if (subscribed.get()) consumer.resubscribe(); - else { - consumer.unsubscribe(); - Utils.sleep(Duration.ofSeconds(1)); - continue; + var future = + CompletableFuture.runAsync( + () -> { + try { + while (!closed.get()) { + if (subscribed.get()) consumer.resubscribe(); + else { + consumer.unsubscribe(); + Utils.sleep(Duration.ofSeconds(1)); + continue; + } + consumer.poll(Duration.ofSeconds(1)).stream() + .mapToLong(r -> System.currentTimeMillis() - r.timestamp()) + .average() + .ifPresent(sensor::record); + } + } catch (WakeupException ignore) { + // Stop polling and being ready to clean up + } finally { + Utils.close(consumer); + closed.set(true); + CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId); + CLIENT_ID_REVOKED_PARTITIONS.remove(clientId); + NON_STICKY_SENSOR.remove(clientId); + DIFFERENCE_SENSOR.remove(clientId); } - consumer.poll(Duration.ofSeconds(1)).stream() - .mapToLong(r -> System.currentTimeMillis() - r.timestamp()) - .average() - .ifPresent(sensor::record); - } - } catch (WakeupException ignore) { - // Stop polling and being ready to clean up - } finally { - Utils.close(consumer); - closeLatch.countDown(); - closed.set(true); - CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId); - CLIENT_ID_REVOKED_PARTITIONS.remove(clientId); - NON_STICKY_SENSOR.remove(clientId); - DIFFERENCE_SENSOR.remove(clientId); - } - }); + }); return new ConsumerThread() { @Override public void waitForDone() { - Utils.swallowException(closeLatch::await); + Utils.swallowException(future::join); } @Override public boolean closed() { - return closeLatch.getCount() == 0; + return future.isDone(); } @Override @@ -161,7 +144,7 @@ public void unsubscribe() { @Override public void close() { closed.set(true); - Utils.swallowException(closeLatch::await); + waitForDone(); } }; }) diff --git a/app/src/main/java/org/astraea/app/performance/DataGenerator.java b/app/src/main/java/org/astraea/app/performance/DataGenerator.java index e5afa45d2..95ac3307d 100644 --- a/app/src/main/java/org/astraea/app/performance/DataGenerator.java +++ b/app/src/main/java/org/astraea/app/performance/DataGenerator.java @@ -17,14 +17,9 @@ package org.astraea.app.performance; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.LongStream; import org.astraea.common.Configuration; @@ -34,13 +29,11 @@ import org.astraea.common.producer.Record; import org.astraea.common.producer.RecordGenerator; -public interface DataGenerator extends AbstractThread { +interface DataGenerator extends AbstractThread { static DataGenerator of( - List>>> queues, + BlockingQueue>> queue, Supplier partitionSelector, Performance.Argument argument) { - if (queues.size() == 0) return terminatedGenerator(); - var keyDistConfig = new Configuration(argument.keyDistributionConfig); var keySizeDistConfig = new Configuration(argument.keySizeDistributionConfig); var valueDistConfig = new Configuration(argument.valueDistributionConfig); @@ -61,61 +54,43 @@ static DataGenerator of( argument.valueSize.measurement(DataUnit.Byte).intValue(), valueDistConfig)) .throughput(tp -> argument.throttles.getOrDefault(tp, argument.throughput)) .build(); - var closeLatch = new CountDownLatch(1); - var executor = Executors.newFixedThreadPool(1); var closed = new AtomicBoolean(false); var start = System.currentTimeMillis(); - var dataCount = new AtomicLong(0); - - // monitor the data generator if close or not - CompletableFuture.runAsync( - () -> { - try { - Utils.swallowException(closeLatch::await); - } finally { - executor.shutdown(); - Utils.swallowException(() -> executor.awaitTermination(30, TimeUnit.SECONDS)); - } - }); // put the data into blocking queue - CompletableFuture.runAsync( - () -> - executor.execute( - () -> { - try { + var future = + CompletableFuture.runAsync( + () -> { + try { + long dataCount = 0; + while (!closed.get()) { + // check the generator is finished or not + if (argument.exeTime.percentage(dataCount, System.currentTimeMillis() - start) + >= 100D) return; + var tp = partitionSelector.get(); + var records = dataSupplier.apply(tp); + dataCount += records.size(); - while (!closed.get()) { - // check the generator is finished or not - if (argument.exeTime.percentage( - dataCount.getAndIncrement(), System.currentTimeMillis() - start) - >= 100D) return; + // throttled data wouldn't put into the queue + if (records.isEmpty()) continue; + queue.put(records); + } + } catch (InterruptedException e) { + throw new RuntimeException("The data generator didn't close properly", e); + } finally { + closed.set(true); + } + }); - var tp = partitionSelector.get(); - var records = dataSupplier.apply(tp); - - // throttled data wouldn't put into the queue - if (records.isEmpty()) continue; - var queue = queues.get(ThreadLocalRandom.current().nextInt(queues.size())); - queue.put(records); - } - } catch (InterruptedException e) { - if (closeLatch.getCount() != 0 || closed.get()) - throw new RuntimeException(e + ", The data generator didn't close properly"); - } finally { - closeLatch.countDown(); - closed.set(true); - } - })); return new DataGenerator() { @Override public void waitForDone() { - Utils.swallowException(closeLatch::await); + Utils.swallowException(future::join); } @Override public boolean closed() { - return closeLatch.getCount() == 0; + return future.isDone(); } @Override @@ -125,19 +100,4 @@ public void close() { } }; } - - static DataGenerator terminatedGenerator() { - return new DataGenerator() { - @Override - public void waitForDone() {} - - @Override - public boolean closed() { - return true; - } - - @Override - public void close() {} - }; - } } diff --git a/app/src/main/java/org/astraea/app/performance/MonkeyThread.java b/app/src/main/java/org/astraea/app/performance/MonkeyThread.java index e4677f5d7..e7a5a8438 100644 --- a/app/src/main/java/org/astraea/app/performance/MonkeyThread.java +++ b/app/src/main/java/org/astraea/app/performance/MonkeyThread.java @@ -26,7 +26,7 @@ import org.astraea.common.consumer.Consumer; import org.astraea.common.consumer.ConsumerConfigs; -public class MonkeyThread implements AbstractThread { +class MonkeyThread implements AbstractThread { private final CountDownLatch closeLatch; private final AtomicBoolean close; diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index cc3a12846..05a58e558 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -21,7 +21,6 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -33,7 +32,6 @@ import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.astraea.app.argument.DataRateField; import org.astraea.app.argument.DataSizeField; import org.astraea.app.argument.DistributionTypeField; @@ -75,10 +73,7 @@ public static void main(String[] args) { } public static List execute(final Argument param) { - var blockingQueues = - IntStream.range(0, param.producers) - .mapToObj(i -> new ArrayBlockingQueue>>(3000)) - .toList(); + var dataQueue = new ArrayBlockingQueue>>(3000); // ensure topics are existent System.out.println("checking topics: " + String.join(",", param.topics)); param.checkTopics(); @@ -87,14 +82,16 @@ public static List execute(final Argument param) { var latestOffsets = param.lastOffsets(); System.out.println("creating threads"); - var producerThreads = ProducerThread.create(blockingQueues, param::createProducer); + var producerThreads = + ProducerThread.create( + dataQueue, param::createProducer, param.producers, param.producerThreads); var consumerThreads = param.monkeys != null ? Collections.synchronizedList(new ArrayList<>(consumers(param, latestOffsets))) : consumers(param, latestOffsets); System.out.println("creating data generator"); - var dataGenerator = DataGenerator.of(blockingQueues, param.topicPartitionSelector(), param); + var dataGenerator = DataGenerator.of(dataQueue, param.topicPartitionSelector(), param); System.out.println("creating tracker"); var tracker = @@ -150,7 +147,7 @@ public static List execute(final Argument param) { while (true) { var current = Report.recordsConsumedTotal(); - if (blockingQueues.stream().allMatch(Collection::isEmpty)) { + if (dataQueue.isEmpty()) { var unfinishedProducers = producerThreads.stream().filter(p -> !p.closed()).toList(); unfinishedProducers.forEach(AbstractThread::close); @@ -233,6 +230,13 @@ Map lastOffsets() { } } + @Parameter( + names = {"--producerThreads"}, + description = "Integer: number of producer threads", + validateWith = NonNegativeShortField.class, + converter = NonNegativeShortField.class) + int producerThreads = 1; + @Parameter( names = {"--producers"}, description = "Integer: number of producers to produce records", diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index 34b9f96ec..ace014f69 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -17,10 +17,10 @@ package org.astraea.app.performance; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -33,7 +33,7 @@ import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; -public interface ProducerThread extends AbstractThread { +interface ProducerThread extends AbstractThread { String DOMAIN_NAME = "org.astraea"; String TYPE_PROPERTY = "type"; @@ -44,90 +44,94 @@ public interface ProducerThread extends AbstractThread { String ID_PROPERTY = "client-id"; static List create( - List>>> queues, - Function> producerSupplier) { - var producers = queues.size(); - if (producers <= 0) return List.of(); - var closeLatches = - IntStream.range(0, producers).mapToObj(ignored -> new CountDownLatch(1)).toList(); - var executors = Executors.newFixedThreadPool(producers); + BlockingQueue>> dataQueue, + Function> producerSupplier, + int producers, + int threads) { + var producerAndSensors = + IntStream.range(0, producers) + .mapToObj( + index -> { + var sensor = Sensor.builder().addStat(AVG_PROPERTY, Avg.of()).build(); + var producer = + producerSupplier.apply(Performance.CLIENT_ID_PREFIX + "-producer-" + index); + // export the custom jmx for report thread + MBeanRegister.local() + .domainName(DOMAIN_NAME) + .property(TYPE_PROPERTY, TYPE_VALUE) + .property(ID_PROPERTY, producer.clientId()) + .attribute(AVG_PROPERTY, Double.class, () -> sensor.measure(AVG_PROPERTY)) + .register(); + return Map.entry(producer, sensor); + }) + .toList(); + List reports = + IntStream.range(0, threads) + .mapToObj( + __ -> { + var closed = new AtomicBoolean(false); + var future = + CompletableFuture.runAsync( + () -> { + try { + while (!closed.get()) { + var index = + ThreadLocalRandom.current().nextInt(producerAndSensors.size()); + var producerAndSensor = producerAndSensors.get(index); + var data = dataQueue.poll(3, TimeUnit.SECONDS); + if (data == null) continue; + var now = System.currentTimeMillis(); + producerAndSensor + .getKey() + .send(data) + .forEach( + f -> + f.whenComplete( + (r, e) -> { + if (e == null) + producerAndSensor + .getValue() + .record( + (double) + (System.currentTimeMillis() - now)); + })); + } + } catch (InterruptedException e) { + throw new RuntimeException( + "The producer thread was prematurely closed.", e); + } finally { + closed.set(true); + } + }); + return new ProducerThread() { + + @Override + public boolean closed() { + return future.isDone(); + } + + @Override + public void waitForDone() { + Utils.swallowException(future::join); + } + + @Override + public void close() { + closed.set(true); + waitForDone(); + } + }; + }) + .collect(Collectors.toUnmodifiableList()); // monitor CompletableFuture.runAsync( () -> { try { - closeLatches.forEach(l -> Utils.swallowException(l::await)); + reports.forEach(l -> Utils.swallowException(l::waitForDone)); } finally { - executors.shutdown(); - Utils.swallowException(() -> executors.awaitTermination(30, TimeUnit.SECONDS)); + producerAndSensors.forEach(p -> Utils.swallowException(() -> p.getKey().close())); } }); - return IntStream.range(0, producers) - .mapToObj( - index -> { - var clientId = Performance.CLIENT_ID_PREFIX + "-producer-" + index; - var closeLatch = closeLatches.get(index); - var closed = new AtomicBoolean(false); - var producer = producerSupplier.apply(clientId); - var queue = queues.get(index); - var sensor = Sensor.builder().addStat(AVG_PROPERTY, Avg.of()).build(); - // export the custom jmx for report thread - MBeanRegister.local() - .domainName(DOMAIN_NAME) - .property(TYPE_PROPERTY, TYPE_VALUE) - .property(ID_PROPERTY, producer.clientId()) - .attribute(AVG_PROPERTY, Double.class, () -> sensor.measure(AVG_PROPERTY)) - .register(); - executors.execute( - () -> { - try { - int interdependentCounter = 0; - - while (!closed.get()) { - - var data = queue.poll(3, TimeUnit.SECONDS); - - var now = System.currentTimeMillis(); - if (data != null) - producer - .send(data) - .forEach( - f -> - f.whenComplete( - (r, e) -> { - if (e == null) - sensor.record( - (double) (System.currentTimeMillis() - now)); - })); - } - } catch (InterruptedException e) { - if (!queue.isEmpty()) - throw new RuntimeException( - e + ", The producer thread was prematurely closed."); - } finally { - Utils.close(producer); - closeLatch.countDown(); - closed.set(true); - } - }); - return new ProducerThread() { - - @Override - public boolean closed() { - return closeLatch.getCount() == 0; - } - - @Override - public void waitForDone() { - Utils.swallowException(closeLatch::await); - } - - @Override - public void close() { - closed.set(true); - waitForDone(); - } - }; - }) - .collect(Collectors.toUnmodifiableList()); + return reports; } } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 294c29b2b..c2a38cae4 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; @@ -34,7 +33,7 @@ import org.astraea.common.metrics.client.producer.ProducerMetrics; /** Print out the given metrics. */ -public interface TrackerThread extends AbstractThread { +interface TrackerThread extends AbstractThread { class ProducerPrinter { private final JndiClient mBeanClient = JndiClient.local(); @@ -170,20 +169,18 @@ boolean tryToPrint(Duration duration) { static TrackerThread create(Supplier producersDone, Supplier consumersDone) { var closed = new AtomicBoolean(false); - var latch = new CountDownLatch(1); - CompletableFuture.runAsync(trackerLoop(closed::get, producersDone, consumersDone)) - .whenComplete((m, e) -> latch.countDown()); + var future = CompletableFuture.runAsync(trackerLoop(closed::get, producersDone, consumersDone)); return new TrackerThread() { @Override public void waitForDone() { - Utils.swallowException(latch::await); + Utils.swallowException(future::join); } @Override public boolean closed() { - return latch.getCount() == 0; + return future.isDone(); } @Override