From b210fe9363c0bbaac28ab99744aa7c081ba3b73a Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 2 Feb 2023 21:10:11 +0800 Subject: [PATCH 01/18] Implementation of `BackboneImbalanceScenario` --- .../app/web/BackboneImbalanceScenario.java | 447 ++++++++++++++++++ .../java/org/astraea/app/web/Scenario.java | 50 +- .../app/web/SkewedPartitionScenario.java | 47 +- .../org/astraea/app/web/TopicHandler.java | 3 +- .../web/BackboneImbalanceScenarioTest.java | 70 +++ .../app/web/SkewedPartitionScenarioTest.java | 3 +- 6 files changed, 570 insertions(+), 50 deletions(-) create mode 100644 app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java create mode 100644 app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java new file mode 100644 index 0000000000..a683d1a2d4 --- /dev/null +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.distribution.ParetoDistribution; +import org.apache.commons.math3.distribution.UniformIntegerDistribution; +import org.apache.commons.math3.distribution.UniformRealDistribution; +import org.apache.commons.math3.random.Well19937c; +import org.apache.commons.math3.util.Pair; +import org.astraea.common.Configuration; +import org.astraea.common.DataRate; +import org.astraea.common.DataSize; +import org.astraea.common.admin.Admin; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; + +/** + * This class build up an imbalance scenario that one of the topic has significant more produce load + * than the others. + */ +public class BackboneImbalanceScenario implements Scenario { + + public static final String CONFIG_RANDOM_SEED = "seed"; + public static final String CONFIG_TOPIC_COUNT = "topicCount"; + public static final String CONFIG_TOPIC_DATA_RATE_PARETO_SCALE = "topicRateParetoScale"; + public static final String CONFIG_TOPIC_DATA_RATE_PARETO_SHAPE = "topicRateParetoShape"; + public static final String CONFIG_TOPIC_CONSUMER_FANOUT_SERIES = "consumerFanoutSeries"; + public static final String CONFIG_PARTITION_COUNT_MIN = "partitionCountMin"; + public static final String CONFIG_PARTITION_COUNT_MAX = "partitionCountMax"; + public static final String CONFIG_BACKBONE_DATA_RATE = "backboneDataRate"; + public static final String CONFIG_PERF_CLIENT_COUNT = "performanceClientCount"; + public static final String CONFIG_PERF_EXTRA_ARGS = "performanceClientExtraArgs"; + + private static final String backboneTopicName = "backbone"; + + @Override + public CompletionStage apply(Admin admin, Configuration scenarioConfig) { + final var config = new Config(scenarioConfig); + final var rng = new Well19937c(config.seed()); + final var topicDataRateDistribution = + new ParetoDistribution(rng, config.topicRateParetoScale(), config.topicRateParetoShape()); + final var backboneDataRateDistribution = + new UniformRealDistribution( + rng, config.backboneDataRate() * 0.9, config.backboneDataRate() * 1.1); + final var topicPartitionCountDistribution = + new UniformIntegerDistribution(rng, config.partitionMin(), config.partitionMax()); + final var topicConsumerFanoutDistribution = + new EnumeratedDistribution<>( + rng, + config.consumerFanoutSeries().stream() + .map(x -> Pair.create(x, 1.0)) + .collect(Collectors.toUnmodifiableList())); + + return CompletableFuture.supplyAsync( + () -> { + final var topicNames = + IntStream.range(0, config.topicCount()) + .mapToObj(index -> "topic_" + index) + .collect(Collectors.toUnmodifiableSet()); + + // create topics + var normalTopics = + topicNames.stream() + .map( + name -> + admin + .creator() + .topic(name) + .numberOfPartitions(topicPartitionCountDistribution.sample()) + .numberOfReplicas((short) 1) + .run()); + var backboneTopic = + Stream.generate( + () -> + admin + .creator() + .topic(backboneTopicName) + .numberOfPartitions(1) + .numberOfReplicas((short) 1) + .run()) + .limit(1); + + Stream.concat(normalTopics, backboneTopic) + .map(CompletionStage::toCompletableFuture) + .peek( + stage -> + stage.whenComplete( + (done, err) -> { + if (err != null) err.printStackTrace(); + })) + .forEach(CompletableFuture::join); + + // gather info and generate necessary variables + var allTopics = + Stream.concat(topicNames.stream(), Stream.of(backboneTopicName)) + .collect(Collectors.toUnmodifiableSet()); + var clusterInfo = admin.clusterInfo(allTopics).toCompletableFuture().join(); + var topicDataRate = + allTopics.stream() + .collect( + Collectors.toUnmodifiableMap( + x -> x, + x -> + DataRate.Byte.of( + (long) + (x.equals(backboneTopicName) + ? backboneDataRateDistribution.sample() + : topicDataRateDistribution.sample())) + .perSecond())); + var consumerFanoutMap = + allTopics.stream() + .collect( + Collectors.toUnmodifiableMap( + x -> x, + x -> + x.equals(backboneTopicName) + ? 1 + : topicConsumerFanoutDistribution.sample())); + + return new Result(config, clusterInfo, allTopics, topicDataRate, consumerFanoutMap); + }); + } + + public static class Result { + + @JsonIgnore private final Config config; + @JsonIgnore private final ClusterInfo clusterInfo; + @JsonIgnore private final Set topics; + @JsonIgnore private final Map topicDataRates; + @JsonIgnore private final Map topicConsumerFanout; + + public Result( + Config config, + ClusterInfo clusterInfo, + Set topics, + Map topicDataRates, + Map topicConsumerFanout) { + this.config = config; + this.clusterInfo = clusterInfo; + this.topics = topics; + this.topicDataRates = topicDataRates; + this.topicConsumerFanout = topicConsumerFanout; + } + + @JsonProperty + public long totalTopics() { + return topics.size(); + } + + @JsonProperty + public long totalPartitions() { + return clusterInfo.replicaStream().filter(r -> topics.contains(r.topic())).count(); + } + + @JsonProperty + public String totalProduceRate() { + var sum = topicDataRates.values().stream().mapToDouble(DataRate::byteRate).sum(); + return DataRate.Byte.of((long) sum).perSecond().toString(); + } + + @JsonProperty + public String totalConsumeRate() { + var sum = + topicDataRates.entrySet().stream() + .mapToDouble(e -> e.getValue().byteRate() * topicConsumerFanout.get(e.getKey())) + .sum(); + return DataRate.Byte.of((long) sum).perSecond().toString(); + } + + @JsonProperty + public double consumerFanoutAverage() { + return config.consumerFanoutSeries().stream().mapToInt(x -> x).average().orElse(0); + } + + @JsonProperty + public Map topicDataRate() { + return topicDataRates.entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, x -> x.getValue().toString())); + } + + @JsonProperty + public Map topicDataRateHistogram() { + var byteRates = + topicDataRates.values().stream() + .map(DataRate::byteRate) + .sorted(Double::compareTo) + .collect(Collectors.toUnmodifiableList()); + var totalRates = byteRates.size(); + // For all the data rates, we use 1/4 portion of the data rates as one histogram bin. And the + // rest of the 3/4 will be used for the rest of the other bins. This process continues + // recursively until no more rate for a single bin. + var histogramBins = + Stream.iterate( + Map.entry(totalRates, totalRates / 4), + e -> e.getKey() > 0, + (e) -> + Map.entry( + (e.getKey() - e.getValue()), + Math.max(1, (e.getKey() - e.getValue()) / 4))) + .map( + e -> { + var taken = totalRates - e.getKey(); + var takes = e.getValue(); + return byteRates.subList(taken, taken + takes); + }) + .collect(Collectors.toUnmodifiableList()); + var rendered = + histogramBins.stream() + .map( + binContent -> { + var first = DataRate.Byte.of(binContent.get(0).longValue()).perSecond(); + var last = + DataRate.Byte.of(binContent.get(binContent.size() - 1).longValue()) + .perSecond(); + var key = String.format("[%s, %s]", first, last); + var value = Integer.toString(binContent.size()); + return Map.entry(key, value); + }) + .collect(Collectors.toUnmodifiableList()); + var orderMap = + IntStream.range(0, rendered.size()) + .boxed() + .collect(Collectors.toUnmodifiableMap(x -> rendered.get(x).getKey(), x -> x)); + var sortedMap = new TreeMap(Comparator.comparingInt(orderMap::get)); + rendered.forEach(e -> sortedMap.put(e.getKey(), e.getValue())); + return sortedMap; + } + + @JsonProperty + public Map topicConsumerFanout() { + return topicConsumerFanout; + } + + @JsonProperty + public Map brokerIngressAvg() { + // Currently we don't have a reliable way to estimate the skew distribution of performance + // tool command output. So we use an average value here. This broker throughput value might + // not reflect from the performance tool command result. + return clusterInfo + .replicaStream() + .collect( + Collectors.groupingBy( + x -> x.nodeInfo().id(), + Collectors.mapping( + x -> + topicDataRates.get(x.topic()).byteRate() + / clusterInfo.replicas(x.topic()).size(), + Collectors.summingDouble(x -> x)))) + .entrySet() + .stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + x -> DataRate.Byte.of(x.getValue().longValue()).perSecond().toString())); + } + + @JsonProperty + public Map brokerEgressAvg() { + // Currently we don't have a reliable way to estimate the skew distribution of performance + // tool command output. So we use an average value here. This broker throughput value might + // not reflect from the performance tool command result. + return clusterInfo + .replicaStream() + .filter(Replica::isLeader) + .collect( + Collectors.groupingBy( + x -> x.nodeInfo().id(), + Collectors.mapping( + x -> + topicDataRates.get(x.topic()).byteRate() + / clusterInfo.replicas(x.topic()).size() + * topicConsumerFanout.get(x.topic()), + Collectors.summingDouble(x -> x)))) + .entrySet() + .stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + x -> DataRate.Byte.of(x.getValue().longValue()).perSecond().toString())); + } + + @JsonProperty + public List> perfCommands() { + class PerfClient { + long ingress = 0; + long egress = 0; + Set topics = new HashSet<>(); + } + var clients = + IntStream.range(0, config.performanceClientCount()) + .mapToObj(i -> new PerfClient()) + .collect(Collectors.toUnmodifiableList()); + + // allocate topics to all the performance clients evenly + for (var topic : topics) { + var dataRate = (long) topicDataRates.get(topic).byteRate(); + var fanout = (int) topicConsumerFanout.get(topic); + for (int i = 0; i < fanout; i++) { + var nextClient = + clients.stream() + .filter(x -> !x.topics.contains(topic)) + .min(Comparator.comparing(x -> x.ingress)) + .orElseThrow(); + nextClient.ingress += dataRate / fanout; + nextClient.egress += dataRate; + nextClient.topics.add(topic); + } + } + + // render the argument + return clients.stream() + .map( + client -> { + var ingress = DataRate.Byte.of(client.ingress).perSecond(); + var egress = DataRate.Byte.of(client.egress).perSecond(); + var args = + String.format( + "--topics %s --throttle %s %s", + String.join(",", client.topics), + client.topics.stream() + .map(topic -> topic + "=" + topicDataRates.get(topic).toString()) + .collect(Collectors.joining(",")), + config.performanceExtraArgs()); + return Map.ofEntries( + Map.entry("args", args), + Map.entry("ingress", ingress.toString()), + Map.entry("egress", egress.toString())); + }) + .collect(Collectors.toUnmodifiableList()); + } + } + + public static class Config { + + private final Configuration scenarioConfig; + private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); + + public Config(Configuration scenarioConfig) { + this.scenarioConfig = scenarioConfig; + + int maxFanout = consumerFanoutSeries().stream().mapToInt(x -> x).max().orElseThrow(); + if (maxFanout > performanceClientCount()) + throw new IllegalArgumentException( + "The number of client is less than the max topic fanout: " + + maxFanout + + " <= " + + performanceClientCount()); + } + + int seed() { + return scenarioConfig + .string(CONFIG_RANDOM_SEED) + .map(Integer::parseInt) + .orElse(defaultRandomSeed); + } + + int topicCount() { + return scenarioConfig.string(CONFIG_TOPIC_COUNT).map(Integer::parseInt).orElse(1000); + } + + int partitionMin() { + return scenarioConfig.string(CONFIG_PARTITION_COUNT_MIN).map(Integer::parseInt).orElse(5); + } + + int partitionMax() { + return scenarioConfig.string(CONFIG_PARTITION_COUNT_MAX).map(Integer::parseInt).orElse(15); + } + + List consumerFanoutSeries() { + return scenarioConfig + .string(CONFIG_TOPIC_CONSUMER_FANOUT_SERIES) + .filter(String::isEmpty) + .map( + seriesString -> + Arrays.stream(seriesString.split(",")) + .map(Integer::parseInt) + .collect(Collectors.toUnmodifiableList())) + .orElse(List.of(0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3)); + } + + double topicRateParetoScale() { + return scenarioConfig + .string(CONFIG_TOPIC_DATA_RATE_PARETO_SCALE) + .map(Double::parseDouble) + .orElse(DataRate.MB.of(2).perSecond().byteRate()); + } + + double topicRateParetoShape() { + return scenarioConfig + .string(CONFIG_TOPIC_DATA_RATE_PARETO_SHAPE) + .map(Double::parseDouble) + .orElse(3.0); + } + + long backboneDataRate() { + return scenarioConfig + .string(CONFIG_BACKBONE_DATA_RATE) + .map(Long::parseLong) + .orElse(DataSize.MB.of(500).bytes()); + } + + int performanceClientCount() { + return scenarioConfig.string(CONFIG_PERF_CLIENT_COUNT).map(Integer::parseInt).orElse(7); + } + + String performanceExtraArgs() { + return scenarioConfig + .string(CONFIG_PERF_EXTRA_ARGS) + .orElse( + "--producers 16 " + + "--consumers 24 " + + "--run.until 1day " + + "--key.size 10KiB " + + "--key.distribution zipfian"); + } + } +} diff --git a/app/src/main/java/org/astraea/app/web/Scenario.java b/app/src/main/java/org/astraea/app/web/Scenario.java index 6949db3574..76cec403e0 100644 --- a/app/src/main/java/org/astraea/app/web/Scenario.java +++ b/app/src/main/java/org/astraea/app/web/Scenario.java @@ -16,13 +16,13 @@ */ package org.astraea.app.web; -import java.util.Map; import java.util.concurrent.CompletionStage; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; /** The subclass of this class should contain the logic to fulfill a scenario. */ -public interface Scenario { +public interface Scenario { static Builder builder() { return new Builder(); @@ -56,54 +56,12 @@ public Builder binomialProbability(double binomialProbability) { return this; } - public Scenario build() { + public Scenario build() { return new SkewedPartitionScenario( topicName, numberOfPartitions, numberOfReplicas, binomialProbability); } } /** Apply this scenario to the Kafka cluster */ - CompletionStage apply(Admin admin); - - class Result { - - private final String topicName; - private final int numberOfPartitions; - private final short numberOfReplicas; - private final Map leaderSum; - private final Map logSum; - - public Result( - String topicName, - int numberOfPartitions, - short numberOfReplicas, - Map leaderSum, - Map logSum) { - this.topicName = topicName; - this.numberOfPartitions = numberOfPartitions; - this.numberOfReplicas = numberOfReplicas; - this.leaderSum = leaderSum; - this.logSum = logSum; - } - - public String topicName() { - return topicName; - } - - public int numberOfPartitions() { - return numberOfPartitions; - } - - public short numberOfReplicas() { - return numberOfReplicas; - } - - public Map leaderSum() { - return leaderSum; - } - - public Map logSum() { - return logSum; - } - } + CompletionStage apply(Admin admin, Configuration scenarioConfig); } diff --git a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java index 7e45b7700a..87f690ce7d 100644 --- a/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java +++ b/app/src/main/java/org/astraea/app/web/SkewedPartitionScenario.java @@ -29,12 +29,13 @@ import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.commons.math3.distribution.IntegerDistribution; import org.apache.commons.math3.util.Pair; +import org.astraea.common.Configuration; import org.astraea.common.admin.Admin; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.TopicPartition; import org.astraea.common.admin.TopicPartitionReplica; -public class SkewedPartitionScenario implements Scenario { +public class SkewedPartitionScenario implements Scenario { final String topicName; final int partitions; @@ -50,7 +51,7 @@ public SkewedPartitionScenario( } @Override - public CompletionStage apply(Admin admin) { + public CompletionStage apply(Admin admin, Configuration scenarioConfig) { return admin .creator() .topic(topicName) @@ -138,4 +139,46 @@ public static List sampledReplicaList( } return result; } + + public static class Result { + + private final String topicName; + private final int numberOfPartitions; + private final short numberOfReplicas; + private final Map leaderSum; + private final Map logSum; + + public Result( + String topicName, + int numberOfPartitions, + short numberOfReplicas, + Map leaderSum, + Map logSum) { + this.topicName = topicName; + this.numberOfPartitions = numberOfPartitions; + this.numberOfReplicas = numberOfReplicas; + this.leaderSum = leaderSum; + this.logSum = logSum; + } + + public String topicName() { + return topicName; + } + + public int numberOfPartitions() { + return numberOfPartitions; + } + + public short numberOfReplicas() { + return numberOfReplicas; + } + + public Map leaderSum() { + return leaderSum; + } + + public Map logSum() { + return logSum; + } + } } diff --git a/app/src/main/java/org/astraea/app/web/TopicHandler.java b/app/src/main/java/org/astraea/app/web/TopicHandler.java index d4320a3033..23c9371226 100644 --- a/app/src/main/java/org/astraea/app/web/TopicHandler.java +++ b/app/src/main/java/org/astraea/app/web/TopicHandler.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.astraea.common.Configuration; import org.astraea.common.FutureUtils; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; @@ -175,7 +176,7 @@ public CompletionStage post(Channel channel) { .numberOfReplicas(numberOfReplicas) .binomialProbability(topic.probability.get()) .build() - .apply(admin) + .apply(admin, Configuration.EMPTY) .thenApply(ignored -> null) .toCompletableFuture(); } diff --git a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java new file mode 100644 index 0000000000..5243714238 --- /dev/null +++ b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.app.web; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import org.astraea.common.Configuration; +import org.astraea.common.admin.Admin; +import org.astraea.it.Service; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BackboneImbalanceScenarioTest { + + @Test + void testApply() { + try (var service = Service.builder().numberOfBrokers(3).build()) { + try (Admin admin = Admin.of(service.bootstrapServers())) { + var scenario = new BackboneImbalanceScenario(); + var result = + scenario + .apply( + admin, + Configuration.of( + Map.ofEntries( + Map.entry(BackboneImbalanceScenario.CONFIG_TOPIC_COUNT, "100")))) + .toCompletableFuture() + .join(); + + Assertions.assertEquals(101, result.totalTopics()); + } + } + } + + @Test + void testSeedWorks() { + var scenario = new BackboneImbalanceScenario(); + var seed = ThreadLocalRandom.current().nextInt(); + var config = + Configuration.of( + Map.ofEntries( + Map.entry(BackboneImbalanceScenario.CONFIG_RANDOM_SEED, Integer.toString(seed)), + Map.entry(BackboneImbalanceScenario.CONFIG_TOPIC_COUNT, "50"))); + try (var service0 = Service.builder().numberOfBrokers(3).build(); + var service1 = Service.builder().numberOfBrokers(3).build(); ) { + try (Admin admin0 = Admin.of(service0.bootstrapServers()); + Admin admin1 = Admin.of(service1.bootstrapServers())) { + var result0 = scenario.apply(admin0, config).toCompletableFuture().join(); + var result1 = scenario.apply(admin1, config).toCompletableFuture().join(); + + Assertions.assertEquals(result0.topicDataRate(), result1.topicDataRate()); + Assertions.assertEquals(result0.topicDataRateHistogram(), result1.topicDataRateHistogram()); + } + } + } +} diff --git a/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java b/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java index 9e3477b9d6..ea718758dc 100644 --- a/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java +++ b/app/src/test/java/org/astraea/app/web/SkewedPartitionScenarioTest.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.math3.distribution.BinomialDistribution; +import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.it.Service; @@ -58,7 +59,7 @@ void test(int partitions, short replicas) { var topicName = Utils.randomString(); var scenario = new SkewedPartitionScenario(topicName, partitions, replicas, 0.5); try (var admin = Admin.of(SERVICE.bootstrapServers())) { - var result = scenario.apply(admin).toCompletableFuture().join(); + var result = scenario.apply(admin, Configuration.EMPTY).toCompletableFuture().join(); Assertions.assertEquals(topicName, result.topicName()); Assertions.assertEquals(partitions, result.numberOfPartitions()); Assertions.assertEquals(replicas, result.numberOfReplicas()); From b2449f3aecd758c7faaca9c5c94d6a596a086ab3 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sun, 5 Feb 2023 16:32:44 +0800 Subject: [PATCH 02/18] Revise --- .../app/web/BackboneImbalanceScenario.java | 89 ++++++++++++++----- .../web/BackboneImbalanceScenarioTest.java | 9 ++ 2 files changed, 75 insertions(+), 23 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index a683d1a2d4..994cd47ac8 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -43,6 +43,7 @@ import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartition; /** * This class build up an imbalance scenario that one of the topic has significant more produce load @@ -137,6 +138,30 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) ? backboneDataRateDistribution.sample() : topicDataRateDistribution.sample())) .perSecond())); + var topicPartitionDataRate = + clusterInfo.topics().stream() + .flatMap( + topic -> { + var partitionWeight = + clusterInfo.replicas(topic).stream() + .map(Replica::topicPartition) + .distinct() + .collect( + Collectors.toUnmodifiableMap(tp -> tp, tp -> rng.nextDouble())); + var totalDataRate = topicDataRate.get(topic).byteRate(); + var totalWeight = + partitionWeight.values().stream().mapToDouble(x -> x).sum(); + + return partitionWeight.entrySet().stream() + .map( + e -> + Map.entry( + e.getKey(), + DataRate.Byte.of( + (long) (totalDataRate * e.getValue() / totalWeight)) + .perSecond())); + }) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); var consumerFanoutMap = allTopics.stream() .collect( @@ -147,7 +172,13 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) ? 1 : topicConsumerFanoutDistribution.sample())); - return new Result(config, clusterInfo, allTopics, topicDataRate, consumerFanoutMap); + return new Result( + config, + clusterInfo, + allTopics, + topicDataRate, + topicPartitionDataRate, + consumerFanoutMap); }); } @@ -157,6 +188,7 @@ public static class Result { @JsonIgnore private final ClusterInfo clusterInfo; @JsonIgnore private final Set topics; @JsonIgnore private final Map topicDataRates; + @JsonIgnore private final Map topicPartitionDataRates; @JsonIgnore private final Map topicConsumerFanout; public Result( @@ -164,11 +196,13 @@ public Result( ClusterInfo clusterInfo, Set topics, Map topicDataRates, + Map topicPartitionDataRates, Map topicConsumerFanout) { this.config = config; this.clusterInfo = clusterInfo; this.topics = topics; this.topicDataRates = topicDataRates; + this.topicPartitionDataRates = topicPartitionDataRates; this.topicConsumerFanout = topicConsumerFanout; } @@ -262,19 +296,14 @@ public Map topicConsumerFanout() { } @JsonProperty - public Map brokerIngressAvg() { - // Currently we don't have a reliable way to estimate the skew distribution of performance - // tool command output. So we use an average value here. This broker throughput value might - // not reflect from the performance tool command result. + public Map brokerIngress() { return clusterInfo .replicaStream() .collect( Collectors.groupingBy( x -> x.nodeInfo().id(), Collectors.mapping( - x -> - topicDataRates.get(x.topic()).byteRate() - / clusterInfo.replicas(x.topic()).size(), + x -> topicPartitionDataRates.get(x.topicPartition()).byteRate(), Collectors.summingDouble(x -> x)))) .entrySet() .stream() @@ -285,10 +314,7 @@ public Map brokerIngressAvg() { } @JsonProperty - public Map brokerEgressAvg() { - // Currently we don't have a reliable way to estimate the skew distribution of performance - // tool command output. So we use an average value here. This broker throughput value might - // not reflect from the performance tool command result. + public Map brokerEgress() { return clusterInfo .replicaStream() .filter(Replica::isLeader) @@ -297,8 +323,7 @@ public Map brokerEgressAvg() { x -> x.nodeInfo().id(), Collectors.mapping( x -> - topicDataRates.get(x.topic()).byteRate() - / clusterInfo.replicas(x.topic()).size() + topicPartitionDataRates.get(x.topicPartition()).byteRate() * topicConsumerFanout.get(x.topic()), Collectors.summingDouble(x -> x)))) .entrySet() @@ -348,7 +373,25 @@ class PerfClient { "--topics %s --throttle %s %s", String.join(",", client.topics), client.topics.stream() - .map(topic -> topic + "=" + topicDataRates.get(topic).toString()) + .flatMap( + topic -> + clusterInfo + .replicaStream(topic) + .map(Replica::topicPartition) + .distinct()) + .map( + tp -> { + var bytes = + topicPartitionDataRates + .get(tp) + .dataSize() + .divide(topicConsumerFanout.get(tp.topic())) + .bytes(); + // TopicPartitionDataRateMapField support only integer measurement + // and no space allowed. So we can't just toString the DataRate + // object :( + return String.format("%s:%sByte/second", tp, bytes); + }) .collect(Collectors.joining(",")), config.performanceExtraArgs()); return Map.ofEntries( @@ -362,6 +405,13 @@ class PerfClient { public static class Config { + public static final String DEFAULT_PERF_ARGS = + "--producers 16 " + + "--consumers 24 " + + "--run.until 1day " + + "--key.size 10KiB " + + "--key.distribution zipfian"; + private final Configuration scenarioConfig; private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); @@ -434,14 +484,7 @@ int performanceClientCount() { } String performanceExtraArgs() { - return scenarioConfig - .string(CONFIG_PERF_EXTRA_ARGS) - .orElse( - "--producers 16 " - + "--consumers 24 " - + "--run.until 1day " - + "--key.size 10KiB " - + "--key.distribution zipfian"); + return scenarioConfig.string(CONFIG_PERF_EXTRA_ARGS).orElse(DEFAULT_PERF_ARGS); } } } diff --git a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java index 5243714238..16791f8571 100644 --- a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java +++ b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java @@ -18,6 +18,8 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import org.astraea.app.argument.Argument; +import org.astraea.app.performance.Performance; import org.astraea.common.Configuration; import org.astraea.common.admin.Admin; import org.astraea.it.Service; @@ -42,6 +44,13 @@ void testApply() { .join(); Assertions.assertEquals(101, result.totalTopics()); + Assertions.assertDoesNotThrow( + () -> { + var perfArgs = + result.perfCommands().get(0).get("args") + " --bootstrap.servers localhost:5566"; + var args = perfArgs.split(" "); + var parsed = Argument.parse(new Performance.Argument(), args); + }); } } } From e901380c1ca3858c6993ab1173e0ae17b9280016 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sun, 5 Feb 2023 16:47:46 +0800 Subject: [PATCH 03/18] fix naming --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 994cd47ac8..08f7f3a607 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -396,8 +396,8 @@ class PerfClient { config.performanceExtraArgs()); return Map.ofEntries( Map.entry("args", args), - Map.entry("ingress", ingress.toString()), - Map.entry("egress", egress.toString())); + Map.entry("perfEgress", ingress.toString()), + Map.entry("perfIngress", egress.toString())); }) .collect(Collectors.toUnmodifiableList()); } From fcc9bb5573056f015815214068ddc1da00d08dfe Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Feb 2023 11:34:54 +0800 Subject: [PATCH 04/18] less threads --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 08f7f3a607..5b528250d2 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -406,8 +406,8 @@ class PerfClient { public static class Config { public static final String DEFAULT_PERF_ARGS = - "--producers 16 " - + "--consumers 24 " + "--producers 8 " + + "--consumers 8 " + "--run.until 1day " + "--key.size 10KiB " + "--key.distribution zipfian"; From 52c53ff17c10a7dcdb3748008622814f6821fc05 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 7 Feb 2023 20:50:37 +0800 Subject: [PATCH 05/18] remove `key.distribution` from default perf arg --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 5b528250d2..94a6b02d3a 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -406,11 +406,7 @@ class PerfClient { public static class Config { public static final String DEFAULT_PERF_ARGS = - "--producers 8 " - + "--consumers 8 " - + "--run.until 1day " - + "--key.size 10KiB " - + "--key.distribution zipfian"; + "--producers 8 --consumers 8 --run.until 1day --key.size 10KiB"; private final Configuration scenarioConfig; private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); From 93695822cab12ae05d20c513af6f957746c5a78f Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 8 Feb 2023 16:43:07 +0800 Subject: [PATCH 06/18] Revise * export used seed * output individual arguments instead of the full text --- .../app/web/BackboneImbalanceScenario.java | 64 +++++++++---------- 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 94a6b02d3a..af64c933be 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -60,7 +60,6 @@ public class BackboneImbalanceScenario implements Scenario { var ingress = DataRate.Byte.of(client.ingress).perSecond(); var egress = DataRate.Byte.of(client.egress).perSecond(); - var args = - String.format( - "--topics %s --throttle %s %s", - String.join(",", client.topics), - client.topics.stream() - .flatMap( - topic -> - clusterInfo - .replicaStream(topic) - .map(Replica::topicPartition) - .distinct()) - .map( - tp -> { - var bytes = - topicPartitionDataRates - .get(tp) - .dataSize() - .divide(topicConsumerFanout.get(tp.topic())) - .bytes(); - // TopicPartitionDataRateMapField support only integer measurement - // and no space allowed. So we can't just toString the DataRate - // object :( - return String.format("%s:%sByte/second", tp, bytes); - }) - .collect(Collectors.joining(",")), - config.performanceExtraArgs()); + var throttle = + client.topics.stream() + .flatMap( + topic -> + clusterInfo + .replicaStream(topic) + .map(Replica::topicPartition) + .distinct()) + .map( + tp -> { + var bytes = + topicPartitionDataRates + .get(tp) + .dataSize() + .divide(topicConsumerFanout.get(tp.topic())) + .bytes(); + // TopicPartitionDataRateMapField support only integer measurement + // and no space allowed. So we can't just toString the DataRate + // object :( + return String.format("%s:%sByte/second", tp, bytes); + }) + .collect(Collectors.joining(",")); return Map.ofEntries( - Map.entry("args", args), + Map.entry("topics", String.join(",", client.topics)), + Map.entry("throttle", throttle), Map.entry("perfEgress", ingress.toString()), Map.entry("perfIngress", egress.toString())); }) .collect(Collectors.toUnmodifiableList()); } + + @JsonProperty + public int seed() { + return config.seed(); + } } public static class Config { - public static final String DEFAULT_PERF_ARGS = - "--producers 8 --consumers 8 --run.until 1day --key.size 10KiB"; - private final Configuration scenarioConfig; private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); @@ -478,9 +476,5 @@ long backboneDataRate() { int performanceClientCount() { return scenarioConfig.string(CONFIG_PERF_CLIENT_COUNT).map(Integer::parseInt).orElse(7); } - - String performanceExtraArgs() { - return scenarioConfig.string(CONFIG_PERF_EXTRA_ARGS).orElse(DEFAULT_PERF_ARGS); - } } } From ff69e1b9ce7ff54de59ff125373f4112aca9b6d1 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 8 Feb 2023 18:50:42 +0800 Subject: [PATCH 07/18] allocate one dedicate perf client for backbone topic --- .../app/web/BackboneImbalanceScenario.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index af64c933be..a2d1fb0b9c 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -340,8 +340,10 @@ class PerfClient { long egress = 0; Set topics = new HashSet<>(); } + var clientCount = config.performanceClientCount(); + if (clientCount < 2) throw new IllegalArgumentException("At least two clients are required"); var clients = - IntStream.range(0, config.performanceClientCount()) + IntStream.range(0, clientCount) .mapToObj(i -> new PerfClient()) .collect(Collectors.toUnmodifiableList()); @@ -351,10 +353,13 @@ class PerfClient { var fanout = (int) topicConsumerFanout.get(topic); for (int i = 0; i < fanout; i++) { var nextClient = - clients.stream() - .filter(x -> !x.topics.contains(topic)) - .min(Comparator.comparing(x -> x.ingress)) - .orElseThrow(); + topic.equals(BackboneImbalanceScenario.backboneTopicName) + ? clients.get(0) + : clients.stream() + .skip(1) + .filter(x -> !x.topics.contains(topic)) + .min(Comparator.comparing(x -> x.ingress)) + .orElseThrow(); nextClient.ingress += dataRate / fanout; nextClient.egress += dataRate; nextClient.topics.add(topic); From 96b1ed4950bc152e3c3192adb593ab1f56d77013 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Fri, 10 Feb 2023 01:06:59 +0800 Subject: [PATCH 08/18] Revise 1. Increase the number of backbone partiiton to 24. 2. Disable zero fanout since performance tool doesn't support it. --- .../app/web/BackboneImbalanceScenario.java | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index a2d1fb0b9c..812ab2b048 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import java.time.Duration; import java.util.Arrays; import java.util.Comparator; import java.util.HashSet; @@ -40,6 +41,7 @@ import org.astraea.common.Configuration; import org.astraea.common.DataRate; import org.astraea.common.DataSize; +import org.astraea.common.Utils; import org.astraea.common.admin.Admin; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Replica; @@ -105,7 +107,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) admin .creator() .topic(backboneTopicName) - .numberOfPartitions(1) + .numberOfPartitions(24) .numberOfReplicas((short) 1) .run()) .limit(1); @@ -119,6 +121,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) if (err != null) err.printStackTrace(); })) .forEach(CompletableFuture::join); + Utils.sleep(Duration.ofSeconds(1)); // gather info and generate necessary variables var allTopics = @@ -139,6 +142,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) .perSecond())); var topicPartitionDataRate = clusterInfo.topics().stream() + .filter(topic -> !topic.equals(backboneTopicName)) .flatMap( topic -> { var partitionWeight = @@ -160,7 +164,32 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) (long) (totalDataRate * e.getValue() / totalWeight)) .perSecond())); }) - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + var backboneTopicBandwidth = topicDataRate.get(backboneTopicName); + var nodeWeight = + IntStream.range(1, clusterInfo.nodes().size()) + .boxed() + .collect( + Collectors.toMap( + index -> clusterInfo.nodes().get(index).id(), index -> rng.nextInt(100))); + nodeWeight.put( + clusterInfo.nodes().get(0).id(), nodeWeight.values().stream().mapToInt(x -> x).sum()); + + clusterInfo.replicas(backboneTopicName).stream() + .collect(Collectors.groupingBy(x -> x.nodeInfo().id())) + .forEach( + (nodeId, replicas) -> { + var weight = nodeWeight.get(nodeId); + var weightSum = nodeWeight.values().stream().mapToInt(x -> x).sum(); + var nodeDataRate = backboneTopicBandwidth.byteRate() * weight / weightSum; + var replicaDataRate = nodeDataRate / replicas.size(); + replicas.forEach( + replica -> + topicPartitionDataRate.put( + replica.topicPartition(), + DataRate.Byte.of((long) replicaDataRate).perSecond())); + }); + var consumerFanoutMap = allTopics.stream() .collect( @@ -336,8 +365,8 @@ public Map brokerEgress() { @JsonProperty public List> perfCommands() { class PerfClient { - long ingress = 0; - long egress = 0; + long consumeRate = 0; + long produceRate = 0; Set topics = new HashSet<>(); } var clientCount = config.performanceClientCount(); @@ -358,10 +387,10 @@ class PerfClient { : clients.stream() .skip(1) .filter(x -> !x.topics.contains(topic)) - .min(Comparator.comparing(x -> x.ingress)) + .min(Comparator.comparing(x -> x.consumeRate)) .orElseThrow(); - nextClient.ingress += dataRate / fanout; - nextClient.egress += dataRate; + nextClient.consumeRate += dataRate; + nextClient.produceRate += dataRate / fanout; nextClient.topics.add(topic); } } @@ -370,8 +399,8 @@ class PerfClient { return clients.stream() .map( client -> { - var ingress = DataRate.Byte.of(client.ingress).perSecond(); - var egress = DataRate.Byte.of(client.egress).perSecond(); + var consumeRate = DataRate.Byte.of(client.consumeRate).perSecond(); + var produceRate = DataRate.Byte.of(client.produceRate).perSecond(); var throttle = client.topics.stream() .flatMap( @@ -397,8 +426,8 @@ class PerfClient { return Map.ofEntries( Map.entry("topics", String.join(",", client.topics)), Map.entry("throttle", throttle), - Map.entry("perfEgress", ingress.toString()), - Map.entry("perfIngress", egress.toString())); + Map.entry("consumeRate", consumeRate.toString()), + Map.entry("produceRate", produceRate.toString())); }) .collect(Collectors.toUnmodifiableList()); } @@ -454,7 +483,7 @@ List consumerFanoutSeries() { Arrays.stream(seriesString.split(",")) .map(Integer::parseInt) .collect(Collectors.toUnmodifiableList())) - .orElse(List.of(0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3)); + .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3)); } double topicRateParetoScale() { From c6b7f80da20168a7cfa2148b20477b88cb5631a8 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Fri, 10 Feb 2023 01:45:57 +0800 Subject: [PATCH 09/18] fix test --- .../astraea/app/web/BackboneImbalanceScenarioTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java index 16791f8571..5243714238 100644 --- a/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java +++ b/app/src/test/java/org/astraea/app/web/BackboneImbalanceScenarioTest.java @@ -18,8 +18,6 @@ import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.astraea.app.argument.Argument; -import org.astraea.app.performance.Performance; import org.astraea.common.Configuration; import org.astraea.common.admin.Admin; import org.astraea.it.Service; @@ -44,13 +42,6 @@ void testApply() { .join(); Assertions.assertEquals(101, result.totalTopics()); - Assertions.assertDoesNotThrow( - () -> { - var perfArgs = - result.perfCommands().get(0).get("args") + " --bootstrap.servers localhost:5566"; - var args = perfArgs.split(" "); - var parsed = Argument.parse(new Performance.Argument(), args); - }); } } } From 59b66d0b06de0de60efa2720a3fc6bf0684c6705 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sat, 11 Feb 2023 20:16:55 +0800 Subject: [PATCH 10/18] Change scenario config --- .../astraea/app/web/BackboneImbalanceScenario.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 812ab2b048..9a5aca291d 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -73,7 +73,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) new ParetoDistribution(rng, config.topicRateParetoScale(), config.topicRateParetoShape()); final var backboneDataRateDistribution = new UniformRealDistribution( - rng, config.backboneDataRate() * 0.9, config.backboneDataRate() * 1.1); + rng, config.backboneDataRate() * 0.8, config.backboneDataRate() * 1.2); final var topicPartitionCountDistribution = new UniformIntegerDistribution(rng, config.partitionMin(), config.partitionMax()); final var topicConsumerFanoutDistribution = @@ -399,6 +399,7 @@ class PerfClient { return clients.stream() .map( client -> { + var isBackbone = client.topics.equals(Set.of(backboneTopicName)); var consumeRate = DataRate.Byte.of(client.consumeRate).perSecond(); var produceRate = DataRate.Byte.of(client.produceRate).perSecond(); var throttle = @@ -423,9 +424,13 @@ class PerfClient { return String.format("%s:%sByte/second", tp, bytes); }) .collect(Collectors.joining(",")); + var throughput = String.format("%dByte/second", (long) produceRate.byteRate()); return Map.ofEntries( + Map.entry("backbone", Boolean.toString(isBackbone)), Map.entry("topics", String.join(",", client.topics)), + Map.entry("throughput", throughput), Map.entry("throttle", throttle), + Map.entry("key_distribution", isBackbone ? "zipfian" : "uniform"), Map.entry("consumeRate", consumeRate.toString()), Map.entry("produceRate", produceRate.toString())); }) @@ -483,14 +488,14 @@ List consumerFanoutSeries() { Arrays.stream(seriesString.split(",")) .map(Integer::parseInt) .collect(Collectors.toUnmodifiableList())) - .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 3)); + .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 6)); } double topicRateParetoScale() { return scenarioConfig .string(CONFIG_TOPIC_DATA_RATE_PARETO_SCALE) .map(Double::parseDouble) - .orElse(DataRate.MB.of(2).perSecond().byteRate()); + .orElse(DataRate.MB.of(1).perSecond().byteRate()); } double topicRateParetoShape() { @@ -504,7 +509,7 @@ long backboneDataRate() { return scenarioConfig .string(CONFIG_BACKBONE_DATA_RATE) .map(Long::parseLong) - .orElse(DataSize.MB.of(500).bytes()); + .orElse(DataSize.MB.of(950).bytes()); } int performanceClientCount() { From 6bbab450b09d571141358d0e8830a53a4267c1ac Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 22 Feb 2023 13:58:27 +0800 Subject: [PATCH 11/18] fix merge --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 9a5aca291d..467dab662c 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -141,7 +141,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) : topicDataRateDistribution.sample())) .perSecond())); var topicPartitionDataRate = - clusterInfo.topics().stream() + clusterInfo.topicNames().stream() .filter(topic -> !topic.equals(backboneTopicName)) .flatMap( topic -> { From dd36d6b36a30123d053e442744955a7923654a52 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 22 Feb 2023 14:41:08 +0800 Subject: [PATCH 12/18] fix key conflict --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 467dab662c..ab817dd7c2 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -312,7 +312,9 @@ public Map topicDataRateHistogram() { var orderMap = IntStream.range(0, rendered.size()) .boxed() - .collect(Collectors.toUnmodifiableMap(x -> rendered.get(x).getKey(), x -> x)); + .collect( + Collectors.toUnmodifiableMap( + x -> rendered.get(x).getKey(), x -> x, Integer::sum)); var sortedMap = new TreeMap(Comparator.comparingInt(orderMap::get)); rendered.forEach(e -> sortedMap.put(e.getKey(), e.getValue())); return sortedMap; From c236968f262bd860d027427d94a2f7d761b65941 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 8 Mar 2023 13:25:19 +0800 Subject: [PATCH 13/18] Support newest performance config --- .../app/web/BackboneImbalanceScenario.java | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index ab817dd7c2..c737e05473 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -21,9 +21,11 @@ import java.time.Duration; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; @@ -62,6 +64,8 @@ public class BackboneImbalanceScenario implements Scenario> perfCommands() { class PerfClient { long consumeRate = 0; long produceRate = 0; - Set topics = new HashSet<>(); + final Set topics = new HashSet<>(); + String keyDistribution; + final Map keyDistributionConfig = new HashMap<>(); } var clientCount = config.performanceClientCount(); if (clientCount < 2) throw new IllegalArgumentException("At least two clients are required"); @@ -396,6 +402,15 @@ class PerfClient { nextClient.topics.add(topic); } } + for(var client: clients) { + var zipfian = + client.topics.equals(Set.of(BackboneImbalanceScenario.backboneTopicName)); + client.keyDistribution = zipfian ? "zipfian" : "uniform"; + if (zipfian) { + client.keyDistributionConfig.put( + "exponent", Double.toString(config.zipfianExponent())); + } + } // render the argument return clients.stream() @@ -427,14 +442,19 @@ class PerfClient { }) .collect(Collectors.joining(",")); var throughput = String.format("%dByte/second", (long) produceRate.byteRate()); + var keyDistConfigString = client.keyDistributionConfig.entrySet().stream() + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.joining()); return Map.ofEntries( Map.entry("backbone", Boolean.toString(isBackbone)), Map.entry("topics", String.join(",", client.topics)), Map.entry("throughput", throughput), Map.entry("throttle", throttle), - Map.entry("key_distribution", isBackbone ? "zipfian" : "uniform"), - Map.entry("consumeRate", consumeRate.toString()), - Map.entry("produceRate", produceRate.toString())); + Map.entry("key_distribution", client.keyDistribution), + Map.entry("key_distribution_config", keyDistConfigString), + Map.entry("key_table_seed", Integer.toString(config.keyTableSeed())), + Map.entry("consume_rate", consumeRate.toString()), + Map.entry("produce_rate", produceRate.toString())); }) .collect(Collectors.toUnmodifiableList()); } @@ -449,6 +469,7 @@ public static class Config { private final Configuration scenarioConfig; private final int defaultRandomSeed = ThreadLocalRandom.current().nextInt(); + private final int defaultPerfKeyTableSeed = new Random(defaultRandomSeed).nextInt(); public Config(Configuration scenarioConfig) { this.scenarioConfig = scenarioConfig; @@ -517,5 +538,19 @@ long backboneDataRate() { int performanceClientCount() { return scenarioConfig.string(CONFIG_PERF_CLIENT_COUNT).map(Integer::parseInt).orElse(7); } + + int keyTableSeed() { + return scenarioConfig + .string(CONFIG_PERF_KEY_TABLE_SEED) + .map(Integer::parseInt) + .orElse(defaultPerfKeyTableSeed); + } + + double zipfianExponent() { + return scenarioConfig + .string(CONFIG_PERF_ZIPFIAN_EXPONENT) + .map(Double::parseDouble) + .orElse(1.0); + } } } From af3f54ea79958e88e156273ec18bdfc999d4df73 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 9 Mar 2023 11:01:07 +0800 Subject: [PATCH 14/18] Fix fanout series config --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index c737e05473..0950ef433f 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -505,7 +505,7 @@ int partitionMax() { List consumerFanoutSeries() { return scenarioConfig .string(CONFIG_TOPIC_CONSUMER_FANOUT_SERIES) - .filter(String::isEmpty) + .filter(s -> !s.isEmpty()) .map( seriesString -> Arrays.stream(seriesString.split(",")) From 10851a2d88e229eb3c792287a8c089720da19842 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 13 Mar 2023 14:55:26 +0800 Subject: [PATCH 15/18] Fixed bandwidth --- .../java/org/astraea/app/web/BackboneImbalanceScenario.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 0950ef433f..e3e1f54acf 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -77,7 +77,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) new ParetoDistribution(rng, config.topicRateParetoScale(), config.topicRateParetoShape()); final var backboneDataRateDistribution = new UniformRealDistribution( - rng, config.backboneDataRate() * 0.8, config.backboneDataRate() * 1.2); + rng, config.backboneDataRate() * 0.999, config.backboneDataRate() * 1.001); final var topicPartitionCountDistribution = new UniformIntegerDistribution(rng, config.partitionMin(), config.partitionMax()); final var topicConsumerFanoutDistribution = @@ -532,7 +532,7 @@ long backboneDataRate() { return scenarioConfig .string(CONFIG_BACKBONE_DATA_RATE) .map(Long::parseLong) - .orElse(DataSize.MB.of(950).bytes()); + .orElse(DataSize.MB.of(800).bytes()); } int performanceClientCount() { From b17d3398c61d72b321c7fddcd8679d110e53d176 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sat, 25 Mar 2023 12:14:22 +0800 Subject: [PATCH 16/18] Update --- .../app/web/BackboneImbalanceScenario.java | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index e3e1f54acf..571c103b6a 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -378,7 +378,7 @@ class PerfClient { final Map keyDistributionConfig = new HashMap<>(); } var clientCount = config.performanceClientCount(); - if (clientCount < 2) throw new IllegalArgumentException("At least two clients are required"); + if (clientCount < 3) throw new IllegalArgumentException("At least three perf clients required"); var clients = IntStream.range(0, clientCount) .mapToObj(i -> new PerfClient()) @@ -389,17 +389,26 @@ class PerfClient { var dataRate = (long) topicDataRates.get(topic).byteRate(); var fanout = (int) topicConsumerFanout.get(topic); for (int i = 0; i < fanout; i++) { - var nextClient = - topic.equals(BackboneImbalanceScenario.backboneTopicName) - ? clients.get(0) - : clients.stream() - .skip(1) - .filter(x -> !x.topics.contains(topic)) - .min(Comparator.comparing(x -> x.consumeRate)) - .orElseThrow(); - nextClient.consumeRate += dataRate; - nextClient.produceRate += dataRate / fanout; - nextClient.topics.add(topic); + if (topic.equals(BackboneImbalanceScenario.backboneTopicName)) { + // separate the processing of produce/consume to two individual clients. + // see https://github.com/skiptests/astraea/issues/1567 + var produceClient = clients.get(0); + produceClient.produceRate += dataRate; + produceClient.topics.add(topic); + var consumeClient = clients.get(1); + consumeClient.consumeRate += dataRate; + consumeClient.topics.add(topic); + } else { + var nextClient = + clients.stream() + .skip(2) + .filter(x -> !x.topics.contains(topic)) + .min(Comparator.comparing(x -> x.consumeRate)) + .orElseThrow(); + nextClient.consumeRate += dataRate; + nextClient.produceRate += dataRate / fanout; + nextClient.topics.add(topic); + } } } for(var client: clients) { @@ -453,6 +462,8 @@ class PerfClient { Map.entry("key_distribution", client.keyDistribution), Map.entry("key_distribution_config", keyDistConfigString), Map.entry("key_table_seed", Integer.toString(config.keyTableSeed())), + Map.entry("no_consumer", Boolean.toString(consumeRate.byteRate() == 0)), + Map.entry("no_producer", Boolean.toString(produceRate.byteRate() == 0)), Map.entry("consume_rate", consumeRate.toString()), Map.entry("produce_rate", produceRate.toString())); }) From ec22f170a441e76ed65874b575029ddc4d35f007 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 27 Mar 2023 16:35:47 +0800 Subject: [PATCH 17/18] Update --- .../app/web/BackboneImbalanceScenario.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index 571c103b6a..b6823f9e09 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -77,7 +77,7 @@ public CompletionStage apply(Admin admin, Configuration scenarioConfig) new ParetoDistribution(rng, config.topicRateParetoScale(), config.topicRateParetoShape()); final var backboneDataRateDistribution = new UniformRealDistribution( - rng, config.backboneDataRate() * 0.999, config.backboneDataRate() * 1.001); + rng, config.backboneDataRate() * 0.8, config.backboneDataRate() * 1.2); final var topicPartitionCountDistribution = new UniformIntegerDistribution(rng, config.partitionMin(), config.partitionMax()); final var topicConsumerFanoutDistribution = @@ -378,7 +378,8 @@ class PerfClient { final Map keyDistributionConfig = new HashMap<>(); } var clientCount = config.performanceClientCount(); - if (clientCount < 3) throw new IllegalArgumentException("At least three perf clients required"); + if (clientCount < 3) + throw new IllegalArgumentException("At least three perf clients required"); var clients = IntStream.range(0, clientCount) .mapToObj(i -> new PerfClient()) @@ -400,7 +401,7 @@ class PerfClient { consumeClient.topics.add(topic); } else { var nextClient = - clients.stream() + clients.stream() .skip(2) .filter(x -> !x.topics.contains(topic)) .min(Comparator.comparing(x -> x.consumeRate)) @@ -411,13 +412,11 @@ class PerfClient { } } } - for(var client: clients) { - var zipfian = - client.topics.equals(Set.of(BackboneImbalanceScenario.backboneTopicName)); + for (var client : clients) { + var zipfian = client.topics.equals(Set.of(BackboneImbalanceScenario.backboneTopicName)); client.keyDistribution = zipfian ? "zipfian" : "uniform"; if (zipfian) { - client.keyDistributionConfig.put( - "exponent", Double.toString(config.zipfianExponent())); + client.keyDistributionConfig.put("exponent", Double.toString(config.zipfianExponent())); } } @@ -451,9 +450,10 @@ class PerfClient { }) .collect(Collectors.joining(",")); var throughput = String.format("%dByte/second", (long) produceRate.byteRate()); - var keyDistConfigString = client.keyDistributionConfig.entrySet().stream() - .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) - .collect(Collectors.joining()); + var keyDistConfigString = + client.keyDistributionConfig.entrySet().stream() + .map(e -> String.format("%s=%s", e.getKey(), e.getValue())) + .collect(Collectors.joining()); return Map.ofEntries( Map.entry("backbone", Boolean.toString(isBackbone)), Map.entry("topics", String.join(",", client.topics)), @@ -522,7 +522,7 @@ List consumerFanoutSeries() { Arrays.stream(seriesString.split(",")) .map(Integer::parseInt) .collect(Collectors.toUnmodifiableList())) - .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 6)); + .orElse(List.of(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 3, 5)); } double topicRateParetoScale() { @@ -543,7 +543,7 @@ long backboneDataRate() { return scenarioConfig .string(CONFIG_BACKBONE_DATA_RATE) .map(Long::parseLong) - .orElse(DataSize.MB.of(800).bytes()); + .orElse(DataSize.MB.of(950).bytes()); } int performanceClientCount() { From eb4593992ee8cb9ee1865ac8444965193dd59410 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Sat, 1 Apr 2023 14:12:13 +0800 Subject: [PATCH 18/18] Remove unnecessary output fields --- .../app/web/BackboneImbalanceScenario.java | 45 ++----------------- 1 file changed, 4 insertions(+), 41 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java index b6823f9e09..e9d3a89db6 100644 --- a/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java +++ b/app/src/main/java/org/astraea/app/web/BackboneImbalanceScenario.java @@ -329,45 +329,6 @@ public Map topicConsumerFanout() { return topicConsumerFanout; } - @JsonProperty - public Map brokerIngress() { - return clusterInfo - .replicaStream() - .collect( - Collectors.groupingBy( - x -> x.nodeInfo().id(), - Collectors.mapping( - x -> topicPartitionDataRates.get(x.topicPartition()).byteRate(), - Collectors.summingDouble(x -> x)))) - .entrySet() - .stream() - .collect( - Collectors.toUnmodifiableMap( - Map.Entry::getKey, - x -> DataRate.Byte.of(x.getValue().longValue()).perSecond().toString())); - } - - @JsonProperty - public Map brokerEgress() { - return clusterInfo - .replicaStream() - .filter(Replica::isLeader) - .collect( - Collectors.groupingBy( - x -> x.nodeInfo().id(), - Collectors.mapping( - x -> - topicPartitionDataRates.get(x.topicPartition()).byteRate() - * topicConsumerFanout.get(x.topic()), - Collectors.summingDouble(x -> x)))) - .entrySet() - .stream() - .collect( - Collectors.toUnmodifiableMap( - Map.Entry::getKey, - x -> DataRate.Byte.of(x.getValue().longValue()).perSecond().toString())); - } - @JsonProperty public List> perfCommands() { class PerfClient { @@ -435,8 +396,10 @@ class PerfClient { .replicaStream(topic) .map(Replica::topicPartition) .distinct()) - .map( + .flatMap( tp -> { + // backbone partition bandwidth is unknown before performance start. + if (!topicPartitionDataRates.containsKey(tp)) return Stream.of(); var bytes = topicPartitionDataRates .get(tp) @@ -446,7 +409,7 @@ class PerfClient { // TopicPartitionDataRateMapField support only integer measurement // and no space allowed. So we can't just toString the DataRate // object :( - return String.format("%s:%sByte/second", tp, bytes); + return Stream.of(String.format("%s:%sByte/second", tp, bytes)); }) .collect(Collectors.joining(",")); var throughput = String.format("%dByte/second", (long) produceRate.byteRate());