From f4f781f624d78a8199518e41c6beeb6cc9338b5c Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 01:06:33 +0800 Subject: [PATCH 01/12] Revise `ShuffleTweaker` --- .../balancer/tweakers/ShuffleTweaker.java | 120 ++++++++---------- 1 file changed, 53 insertions(+), 67 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 33f3b4b625..517f4f37a2 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -18,13 +18,10 @@ import java.util.Collection; import java.util.Map; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoBuilder; @@ -72,75 +69,64 @@ public Stream generate(ClusterInfo baseAllocation) { return Stream.generate( () -> { final var shuffleCount = numberOfShuffle.get(); - - var candidates = - IntStream.range(0, shuffleCount) - .mapToObj(i -> allocationGenerator(baseAllocation.brokerFolders())) + final var eligiblePartitions = + baseAllocation.topicPartitions().stream() + .filter(tp -> eligiblePartition((baseAllocation.replicas(tp)))) + .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .limit(shuffleCount) .collect(Collectors.toUnmodifiableList()); - var currentAllocation = baseAllocation; - for (var candidate : candidates) currentAllocation = candidate.apply(currentAllocation); - - return currentAllocation; + final var finalCluster = ClusterInfoBuilder.builder(baseAllocation); + for (int i = 0; i < shuffleCount && i < eligiblePartitions.size(); i++) { + final var tp = eligiblePartitions.get(i); + switch (ThreadLocalRandom.current().nextInt(0, 2)) { + case 0: + { + // change leader/follower identity + baseAllocation + .replicaStream(tp) + .filter(Replica::isFollower) + .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .ifPresent(r -> finalCluster.setPreferredLeader(r.topicPartitionReplica())); + break; + } + case 1: + { + // change replica list + var replicaList = baseAllocation.replicas(tp); + var currentIds = + replicaList.stream() + .map(Replica::nodeInfo) + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableSet()); + baseAllocation.brokers().stream() + .filter(b -> !currentIds.contains(b.id())) + .map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .ifPresent( + broker -> { + var replica = randomElement(replicaList); + finalCluster.reassignReplica( + replica.topicPartitionReplica(), + broker.id(), + randomElement(baseAllocation.brokerFolders().get(broker.id()))); + }); + break; + } + default: + throw new RuntimeException("Unexpected Condition"); + } + } + + return finalCluster.build(); }); } - private static Function allocationGenerator( - Map> brokerFolders) { - return currentAllocation -> { - final var selectedPartition = - currentAllocation.topicPartitions().stream() - .filter(tp -> eligiblePartition((currentAllocation.replicas(tp)))) - .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .orElseThrow(); - - // [valid operation 1] change leader/follower identity - final var currentReplicas = currentAllocation.replicas(selectedPartition); - final var candidates0 = - currentReplicas.stream() - .skip(1) - .map( - follower -> - (Supplier) - () -> - ClusterInfoBuilder.builder(currentAllocation) - .setPreferredLeader(follower.topicPartitionReplica()) - .build()); - - // [valid operation 2] change replica list - final var currentIds = - currentReplicas.stream() - .map(Replica::nodeInfo) - .map(NodeInfo::id) - .collect(Collectors.toUnmodifiableSet()); - final var candidates1 = - brokerFolders.keySet().stream() - .filter(brokerId -> !currentIds.contains(brokerId)) - .flatMap( - toThisBroker -> - currentReplicas.stream() - .map( - replica -> - (Supplier) - () -> { - var toThisDir = - randomElement(brokerFolders.get(toThisBroker)); - return ClusterInfoBuilder.builder(currentAllocation) - .reassignReplica( - replica.topicPartitionReplica(), - toThisBroker, - toThisDir) - .build(); - })); - - return randomElement( - Stream.concat(candidates0, candidates1).collect(Collectors.toUnmodifiableSet())) - .get(); - }; - } - private static T randomElement(Collection collection) { return collection.stream() .skip(ThreadLocalRandom.current().nextInt(0, collection.size())) From fe69fae092fdf2469f48652dab2970cae64d3f0c Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 15:50:48 +0800 Subject: [PATCH 02/12] Avoid excessive replica list collection --- .../astraea/common/balancer/tweakers/ShuffleTweaker.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 517f4f37a2..5ba433ea79 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -69,9 +69,8 @@ public Stream generate(ClusterInfo baseAllocation) { return Stream.generate( () -> { final var shuffleCount = numberOfShuffle.get(); - final var eligiblePartitions = + final var partitionOrder = baseAllocation.topicPartitions().stream() - .filter(tp -> eligiblePartition((baseAllocation.replicas(tp)))) .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) @@ -79,8 +78,9 @@ public Stream generate(ClusterInfo baseAllocation) { .collect(Collectors.toUnmodifiableList()); final var finalCluster = ClusterInfoBuilder.builder(baseAllocation); - for (int i = 0; i < shuffleCount && i < eligiblePartitions.size(); i++) { - final var tp = eligiblePartitions.get(i); + for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) { + final var tp = partitionOrder.get(i); + if (!eligiblePartition(baseAllocation.replicas(tp))) continue; switch (ThreadLocalRandom.current().nextInt(0, 2)) { case 0: { @@ -121,6 +121,7 @@ public Stream generate(ClusterInfo baseAllocation) { default: throw new RuntimeException("Unexpected Condition"); } + shuffled++; } return finalCluster.build(); From ba566506a56e338df54c498d753307d7ac8516d7 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 16:53:51 +0800 Subject: [PATCH 03/12] Let `ClusterInfo.Optimized#replicas` use cache result --- .../org/astraea/common/admin/ClusterInfo.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index d8d712924a..677c5ad7f4 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -17,6 +17,7 @@ package org.astraea.common.admin; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -603,6 +604,26 @@ public Map topics() { return topics.get(); } + @Override + public List replicas() { + return Collections.unmodifiableList(all); + } + + @Override + public List replicas(String topic) { + return Collections.unmodifiableList(byTopic.get().get(topic)); + } + + @Override + public List replicas(TopicPartition topicPartition) { + return Collections.unmodifiableList(byPartition.get().get(topicPartition)); + } + + @Override + public List replicas(TopicPartitionReplica replica) { + return Collections.unmodifiableList(byReplica.get().get(replica)); + } + @Override public List replicaLeaders(String topic) { return byTopicForLeader.get().getOrDefault(topic, List.of()); From 42e9caf52ee8b65d7bdbcf8c8e894504a71bca9c Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 19:55:01 +0800 Subject: [PATCH 04/12] Revise --- .../org/astraea/common/admin/ClusterInfo.java | 9 ++++----- .../astraea/common/admin/ClusterInfoTest.java | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index 677c5ad7f4..a7dc6042aa 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -17,7 +17,6 @@ package org.astraea.common.admin; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -606,22 +605,22 @@ public Map topics() { @Override public List replicas() { - return Collections.unmodifiableList(all); + return all; } @Override public List replicas(String topic) { - return Collections.unmodifiableList(byTopic.get().get(topic)); + return byTopic.get().getOrDefault(topic, List.of()); } @Override public List replicas(TopicPartition topicPartition) { - return Collections.unmodifiableList(byPartition.get().get(topicPartition)); + return byPartition.get().getOrDefault(topicPartition, List.of()); } @Override public List replicas(TopicPartitionReplica replica) { - return Collections.unmodifiableList(byReplica.get().get(replica)); + return byReplica.get().getOrDefault(replica, List.of()); } @Override diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java index 23d1d1d908..5ec3c9866b 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java @@ -148,4 +148,22 @@ void testTopics() { Assertions.assertEquals(topics, cluster.topics().keySet()); Assertions.assertEquals(topics, cluster.topicNames()); } + + @Test + void testReturnCollectionUnmodifiable() { + var cluster = ClusterInfo.empty(); + var replica = + Replica.builder() + .topic("topic") + .partition(0) + .nodeInfo(NodeInfo.of(0, "", -1)) + .path("f") + .buildLeader(); + Assertions.assertThrows(Exception.class, () -> cluster.replicas().add(replica)); + Assertions.assertThrows(Exception.class, () -> cluster.replicas("t").add(replica)); + Assertions.assertThrows( + Exception.class, () -> cluster.replicas(TopicPartition.of("t", 0)).add(replica)); + Assertions.assertThrows( + Exception.class, () -> cluster.replicas(TopicPartitionReplica.of("t", 0, 10)).add(replica)); + } } From 8a70e256894b1219b2e4e898859652d5366d86f1 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 20:26:29 +0800 Subject: [PATCH 05/12] Remove `ClusterInfo#update` This function is designed for `ClusterLogAllocation`, a data structure been deleted for a while. --- .../org/astraea/common/admin/ClusterInfo.java | 34 ---------- .../balancer/algorithms/GreedyBalancer.java | 15 ++--- .../algorithms/SingleStepBalancer.java | 12 ++-- .../admin/ClusterInfoIntegratedTest.java | 63 ------------------- 4 files changed, 11 insertions(+), 113 deletions(-) diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index a7dc6042aa..5bd935b90e 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -25,7 +25,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BinaryOperator; -import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -54,39 +53,6 @@ static ClusterInfo masked(ClusterInfo clusterInfo, Predicate topicFilter return of(clusterInfo.clusterId(), nodes, topics, replicas); } - /** - * Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned - * {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given - * {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in - * the associated cluster. Only the replicas related to the covered topic/partition get updated. - * - *

This method intended to offer a way to describe a cluster with some of its state modified - * manually. - * - * @param clusterInfo to get updated - * @param replacement offers new host and data folder - * @return new cluster info - */ - static ClusterInfo update( - ClusterInfo clusterInfo, Function> replacement) { - var newReplicas = - clusterInfo.replicas().stream() - .collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition()))) - .entrySet() - .stream() - .map( - entry -> { - var replaced = replacement.apply(entry.getKey()); - if (replaced.isEmpty()) return entry.getValue(); - return replaced; - }) - .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableList()); - - return ClusterInfo.of( - clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas); - } - /** * Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log * placement in the given target allocation. Note that the given two allocations must have the diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 2fdc8208fa..26dcba5fb9 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -151,15 +151,12 @@ public Plan offer(AlgorithmConfig config) { .generate(currentAllocation) .takeWhile(ignored -> moreRoom.get()) .map( - newAllocation -> { - var newClusterInfo = - ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost( - currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost( + currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter( plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index f9c19510ba..713c691998 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -78,13 +78,11 @@ public Plan offer(AlgorithmConfig config) { .limit(iteration) .takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis()) .map( - newAllocation -> { - var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java index cc0bc9e3e8..3d8d507edf 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java @@ -16,17 +16,8 @@ */ package org.astraea.common.admin; -import java.time.Duration; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import org.astraea.common.Utils; -import org.astraea.common.producer.Producer; -import org.astraea.common.producer.Record; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; public class ClusterInfoIntegratedTest { @@ -36,58 +27,4 @@ public class ClusterInfoIntegratedTest { static void closeService() { SERVICE.close(); } - - @Test - void testUpdate() { - var topicName = Utils.randomString(5); - try (var admin = Admin.of(SERVICE.bootstrapServers())) { - admin.creator().topic(topicName).run().toCompletableFuture().join(); - Utils.sleep(Duration.ofSeconds(3)); - - try (var producer = Producer.of(SERVICE.bootstrapServers())) { - IntStream.range(0, 100) - .forEach( - ignored -> - producer.send(Record.builder().topic(topicName).key(new byte[10]).build())); - } - - var clusterInfo = admin.clusterInfo(Set.of(topicName)).toCompletableFuture().join(); - clusterInfo.replicas().forEach(r -> Assertions.assertTrue(r.size() > 0)); - - var replica = clusterInfo.replicas().iterator().next(); - var newBrokerId = - SERVICE.dataFolders().keySet().stream() - .filter(id -> id != replica.nodeInfo().id()) - .findFirst() - .get(); - - var randomSizeValue = ThreadLocalRandom.current().nextInt(); - var merged = - ClusterInfo.update( - clusterInfo, - tp -> - tp.equals(TopicPartition.of(topicName, 0)) - ? Set.of( - Replica.builder() - .topic(topicName) - .partition(0) - .nodeInfo(NodeInfo.of(newBrokerId, "", -1)) - .lag(0) - .size(randomSizeValue) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(true) - .path(replica.path()) - .build()) - : Set.of()); - - Assertions.assertEquals(clusterInfo.replicas().size(), merged.replicas().size()); - Assertions.assertEquals(clusterInfo.topicNames().size(), merged.topicNames().size()); - merged.replicas().forEach(r -> Assertions.assertEquals(randomSizeValue, r.size())); - Assertions.assertEquals(1, merged.replicas(topicName).size()); - Assertions.assertEquals(newBrokerId, merged.replicas(topicName).get(0).nodeInfo().id()); - } - } } From c435bcdac7c5b3853e209a63385e0905ea2e578d Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 20:45:58 +0800 Subject: [PATCH 06/12] Increase shuffle count only if a movement has been performed --- .../balancer/tweakers/ShuffleTweaker.java | 46 ++++++++++--------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 5ba433ea79..66c545f8b7 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -85,13 +85,17 @@ public Stream generate(ClusterInfo baseAllocation) { case 0: { // change leader/follower identity - baseAllocation - .replicaStream(tp) - .filter(Replica::isFollower) - .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .ifPresent(r -> finalCluster.setPreferredLeader(r.topicPartitionReplica())); + var replica = + baseAllocation + .replicaStream(tp) + .filter(Replica::isFollower) + .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + if (replica.isPresent()) { + finalCluster.setPreferredLeader(replica.get().topicPartitionReplica()); + shuffled++; + } break; } case 1: @@ -103,25 +107,25 @@ public Stream generate(ClusterInfo baseAllocation) { .map(Replica::nodeInfo) .map(NodeInfo::id) .collect(Collectors.toUnmodifiableSet()); - baseAllocation.brokers().stream() - .filter(b -> !currentIds.contains(b.id())) - .map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt())) - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .ifPresent( - broker -> { - var replica = randomElement(replicaList); - finalCluster.reassignReplica( - replica.topicPartitionReplica(), - broker.id(), - randomElement(baseAllocation.brokerFolders().get(broker.id()))); - }); + var broker = + baseAllocation.brokers().stream() + .filter(b -> !currentIds.contains(b.id())) + .map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + if (broker.isPresent()) { + var replica = randomElement(replicaList); + finalCluster.reassignReplica( + replica.topicPartitionReplica(), + broker.get().id(), + randomElement(baseAllocation.brokerFolders().get(broker.get().id()))); + shuffled++; + } break; } default: throw new RuntimeException("Unexpected Condition"); } - shuffled++; } return finalCluster.build(); From d88924c0a99579dcd0aad697c6c9f79841ee9d5c Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 6 Mar 2023 21:17:39 +0800 Subject: [PATCH 07/12] Revert "Remove `ClusterInfo#update`" This reverts commit 8a70e256894b1219b2e4e898859652d5366d86f1. --- .../org/astraea/common/admin/ClusterInfo.java | 34 ++++++++++ .../balancer/algorithms/GreedyBalancer.java | 15 +++-- .../algorithms/SingleStepBalancer.java | 12 ++-- .../admin/ClusterInfoIntegratedTest.java | 63 +++++++++++++++++++ 4 files changed, 113 insertions(+), 11 deletions(-) diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index 5bd935b90e..a7dc6042aa 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import java.util.function.BinaryOperator; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -53,6 +54,39 @@ static ClusterInfo masked(ClusterInfo clusterInfo, Predicate topicFilter return of(clusterInfo.clusterId(), nodes, topics, replicas); } + /** + * Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned + * {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given + * {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in + * the associated cluster. Only the replicas related to the covered topic/partition get updated. + * + *

This method intended to offer a way to describe a cluster with some of its state modified + * manually. + * + * @param clusterInfo to get updated + * @param replacement offers new host and data folder + * @return new cluster info + */ + static ClusterInfo update( + ClusterInfo clusterInfo, Function> replacement) { + var newReplicas = + clusterInfo.replicas().stream() + .collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition()))) + .entrySet() + .stream() + .map( + entry -> { + var replaced = replacement.apply(entry.getKey()); + if (replaced.isEmpty()) return entry.getValue(); + return replaced; + }) + .flatMap(Collection::stream) + .collect(Collectors.toUnmodifiableList()); + + return ClusterInfo.of( + clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas); + } + /** * Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log * placement in the given target allocation. Note that the given two allocations must have the diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 26dcba5fb9..2fdc8208fa 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -151,12 +151,15 @@ public Plan offer(AlgorithmConfig config) { .generate(currentAllocation) .takeWhile(ignored -> moreRoom.get()) .map( - newAllocation -> - new Solution( - clusterCostFunction.clusterCost(newAllocation, clusterBean), - moveCostFunction.moveCost( - currentClusterInfo, newAllocation, clusterBean), - newAllocation)) + newAllocation -> { + var newClusterInfo = + ClusterInfo.update(currentClusterInfo, newAllocation::replicas); + return new Solution( + clusterCostFunction.clusterCost(newClusterInfo, clusterBean), + moveCostFunction.moveCost( + currentClusterInfo, newClusterInfo, clusterBean), + newAllocation); + }) .filter( plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index 713c691998..f9c19510ba 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -78,11 +78,13 @@ public Plan offer(AlgorithmConfig config) { .limit(iteration) .takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis()) .map( - newAllocation -> - new Solution( - clusterCostFunction.clusterCost(newAllocation, clusterBean), - moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean), - newAllocation)) + newAllocation -> { + var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); + return new Solution( + clusterCostFunction.clusterCost(newClusterInfo, clusterBean), + moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean), + newAllocation); + }) .filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java index 3d8d507edf..cc0bc9e3e8 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java @@ -16,8 +16,17 @@ */ package org.astraea.common.admin; +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; +import org.astraea.common.Utils; +import org.astraea.common.producer.Producer; +import org.astraea.common.producer.Record; import org.astraea.it.Service; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; public class ClusterInfoIntegratedTest { @@ -27,4 +36,58 @@ public class ClusterInfoIntegratedTest { static void closeService() { SERVICE.close(); } + + @Test + void testUpdate() { + var topicName = Utils.randomString(5); + try (var admin = Admin.of(SERVICE.bootstrapServers())) { + admin.creator().topic(topicName).run().toCompletableFuture().join(); + Utils.sleep(Duration.ofSeconds(3)); + + try (var producer = Producer.of(SERVICE.bootstrapServers())) { + IntStream.range(0, 100) + .forEach( + ignored -> + producer.send(Record.builder().topic(topicName).key(new byte[10]).build())); + } + + var clusterInfo = admin.clusterInfo(Set.of(topicName)).toCompletableFuture().join(); + clusterInfo.replicas().forEach(r -> Assertions.assertTrue(r.size() > 0)); + + var replica = clusterInfo.replicas().iterator().next(); + var newBrokerId = + SERVICE.dataFolders().keySet().stream() + .filter(id -> id != replica.nodeInfo().id()) + .findFirst() + .get(); + + var randomSizeValue = ThreadLocalRandom.current().nextInt(); + var merged = + ClusterInfo.update( + clusterInfo, + tp -> + tp.equals(TopicPartition.of(topicName, 0)) + ? Set.of( + Replica.builder() + .topic(topicName) + .partition(0) + .nodeInfo(NodeInfo.of(newBrokerId, "", -1)) + .lag(0) + .size(randomSizeValue) + .isLeader(true) + .isSync(true) + .isFuture(false) + .isOffline(false) + .isPreferredLeader(true) + .path(replica.path()) + .build()) + : Set.of()); + + Assertions.assertEquals(clusterInfo.replicas().size(), merged.replicas().size()); + Assertions.assertEquals(clusterInfo.topicNames().size(), merged.topicNames().size()); + merged.replicas().forEach(r -> Assertions.assertEquals(randomSizeValue, r.size())); + Assertions.assertEquals(1, merged.replicas(topicName).size()); + Assertions.assertEquals(newBrokerId, merged.replicas(topicName).get(0).nodeInfo().id()); + } + } } From 9d3893334b664564259553eaf8e1eda02fbcf9ea Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 7 Mar 2023 15:07:41 +0800 Subject: [PATCH 08/12] Move topic filter into `ShuffleTweaker` --- .../balancer/algorithms/GreedyBalancer.java | 22 +++++++++---------- .../algorithms/SingleStepBalancer.java | 21 +++++++++--------- .../balancer/tweakers/ShuffleTweaker.java | 10 +++++---- .../balancer/tweakers/ShuffleTweakerTest.java | 9 ++++---- .../astraea/common/cost/NetworkCostTest.java | 5 +++-- 5 files changed, 35 insertions(+), 32 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 2fdc8208fa..f64f8d65b6 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAccumulator; import java.util.concurrent.atomic.LongAdder; @@ -135,7 +136,9 @@ public GreedyBalancer(Configuration config) { public Plan offer(AlgorithmConfig config) { final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); - final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); + final var allocationTweaker = + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter()); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean); @@ -151,22 +154,19 @@ public Plan offer(AlgorithmConfig config) { .generate(currentAllocation) .takeWhile(ignored -> moreRoom.get()) .map( - newAllocation -> { - var newClusterInfo = - ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost( - currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost( + currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter( plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .findFirst(); var currentCost = initialCost; - var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter()); + var currentAllocation = currentClusterInfo; var currentSolution = Optional.empty(); // register JMX diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index f9c19510ba..591e7f40b7 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -19,9 +19,9 @@ import java.util.Comparator; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import org.astraea.common.Configuration; import org.astraea.common.Utils; -import org.astraea.common.admin.ClusterInfo; import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.tweakers.ShuffleTweaker; @@ -64,27 +64,26 @@ public SingleStepBalancer(Configuration config) { public Plan offer(AlgorithmConfig config) { final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); - final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); + final var allocationTweaker = + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter()); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); final var currentCost = config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean); - final var generatorClusterInfo = ClusterInfo.masked(currentClusterInfo, config.topicFilter()); var start = System.currentTimeMillis(); return allocationTweaker - .generate(generatorClusterInfo) + .generate(currentClusterInfo) .parallel() .limit(iteration) .takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis()) .map( - newAllocation -> { - var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 66c545f8b7..1f7a511c94 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -44,13 +44,15 @@ public class ShuffleTweaker implements AllocationTweaker { private final Supplier numberOfShuffle; + private final Predicate topicFilter; - public ShuffleTweaker(int origin, int bound) { - this(() -> ThreadLocalRandom.current().nextInt(origin, bound)); + public ShuffleTweaker(Supplier numberOfShuffle) { + this(numberOfShuffle, (x) -> true); } - public ShuffleTweaker(Supplier numberOfShuffle) { + public ShuffleTweaker(Supplier numberOfShuffle, Predicate topicFilter) { this.numberOfShuffle = numberOfShuffle; + this.topicFilter = topicFilter; } @Override @@ -71,10 +73,10 @@ public Stream generate(ClusterInfo baseAllocation) { final var shuffleCount = numberOfShuffle.get(); final var partitionOrder = baseAllocation.topicPartitions().stream() + .filter(tp -> topicFilter.test(tp.topic())) .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) - .limit(shuffleCount) .collect(Collectors.toUnmodifiableList()); final var finalCluster = ClusterInfoBuilder.builder(baseAllocation); diff --git a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java index 07d4112929..2f635572a6 100644 --- a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import org.astraea.common.Utils; @@ -41,7 +42,7 @@ class ShuffleTweakerTest { @Test void testRun() { - final var shuffleTweaker = new ShuffleTweaker(5, 10); + final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); final var fakeCluster = FakeClusterInfo.of(100, 10, 10, 3); final var stream = shuffleTweaker.generate(fakeCluster); final var iterator = stream.iterator(); @@ -118,7 +119,7 @@ void performanceTest( // log. // Notice: Stream#limit() will hurt performance. the number here might not reflect the actual // performance. - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); final var fakeCluster = FakeClusterInfo.of(nodeCount, topicCount, partitionCount, replicaCount); final var size = 1000; @@ -136,7 +137,7 @@ void performanceTest( @Test void parallelStreamWorks() { - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); final var fakeCluster = FakeClusterInfo.of(10, 20, 10, 3); // generator can do parallel without error. @@ -147,7 +148,7 @@ void parallelStreamWorks() { @Test @Disabled void parallelPerformanceTests() throws InterruptedException { - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); final var fakeCluster = FakeClusterInfo.of(50, 500, 30, 2); final var counter = new LongAdder(); final var forkJoinPool = new ForkJoinPool(ForkJoinPool.getCommonPoolParallelism()); diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 8cdd81d492..14757e2f58 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -324,8 +325,8 @@ void testExpectedImprovement(int seed) { var testCase = new LargeTestCase(6, 100, seed); var clusterInfo = testCase.clusterInfo(); var clusterBean = testCase.clusterBean(); - var smallShuffle = new ShuffleTweaker(1, 6); - var largeShuffle = new ShuffleTweaker(1, 31); + var smallShuffle = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 6)); + var largeShuffle = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31)); var costFunction = HasClusterCost.of(Map.of(new NetworkIngressCost(), 1.0, new NetworkEgressCost(), 1.0)); var originalCost = costFunction.clusterCost(clusterInfo, clusterBean); From f79f7fb38296227960f54c7f0b92a78a56e7858e Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 7 Mar 2023 17:10:03 +0800 Subject: [PATCH 09/12] Remove `ClusterInfo#update` and `ClusterInfo#masked` --- .../org/astraea/common/admin/ClusterInfo.java | 50 ---------- .../admin/ClusterInfoIntegratedTest.java | 93 ------------------- .../astraea/common/admin/ClusterInfoTest.java | 56 ----------- .../astraea/common/balancer/BalancerTest.java | 7 +- 4 files changed, 3 insertions(+), 203 deletions(-) delete mode 100644 common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index a7dc6042aa..b7b16d637d 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -25,8 +25,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -39,54 +37,6 @@ static ClusterInfo empty() { // ---------------------[helpers]---------------------// - /** Mask specific topics from a {@link ClusterInfo}. */ - static ClusterInfo masked(ClusterInfo clusterInfo, Predicate topicFilter) { - final var nodes = List.copyOf(clusterInfo.nodes()); - final var topics = - clusterInfo.topics().entrySet().stream() - .filter(e -> topicFilter.test(e.getKey())) - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); - final var replicas = - clusterInfo - .replicaStream() - .filter(replica -> topicFilter.test(replica.topic())) - .collect(Collectors.toUnmodifiableList()); - return of(clusterInfo.clusterId(), nodes, topics, replicas); - } - - /** - * Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned - * {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given - * {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in - * the associated cluster. Only the replicas related to the covered topic/partition get updated. - * - *

This method intended to offer a way to describe a cluster with some of its state modified - * manually. - * - * @param clusterInfo to get updated - * @param replacement offers new host and data folder - * @return new cluster info - */ - static ClusterInfo update( - ClusterInfo clusterInfo, Function> replacement) { - var newReplicas = - clusterInfo.replicas().stream() - .collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition()))) - .entrySet() - .stream() - .map( - entry -> { - var replaced = replacement.apply(entry.getKey()); - if (replaced.isEmpty()) return entry.getValue(); - return replaced; - }) - .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableList()); - - return ClusterInfo.of( - clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas); - } - /** * Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log * placement in the given target allocation. Note that the given two allocations must have the diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java deleted file mode 100644 index cc0bc9e3e8..0000000000 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.admin; - -import java.time.Duration; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import org.astraea.common.Utils; -import org.astraea.common.producer.Producer; -import org.astraea.common.producer.Record; -import org.astraea.it.Service; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class ClusterInfoIntegratedTest { - - private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); - - @AfterAll - static void closeService() { - SERVICE.close(); - } - - @Test - void testUpdate() { - var topicName = Utils.randomString(5); - try (var admin = Admin.of(SERVICE.bootstrapServers())) { - admin.creator().topic(topicName).run().toCompletableFuture().join(); - Utils.sleep(Duration.ofSeconds(3)); - - try (var producer = Producer.of(SERVICE.bootstrapServers())) { - IntStream.range(0, 100) - .forEach( - ignored -> - producer.send(Record.builder().topic(topicName).key(new byte[10]).build())); - } - - var clusterInfo = admin.clusterInfo(Set.of(topicName)).toCompletableFuture().join(); - clusterInfo.replicas().forEach(r -> Assertions.assertTrue(r.size() > 0)); - - var replica = clusterInfo.replicas().iterator().next(); - var newBrokerId = - SERVICE.dataFolders().keySet().stream() - .filter(id -> id != replica.nodeInfo().id()) - .findFirst() - .get(); - - var randomSizeValue = ThreadLocalRandom.current().nextInt(); - var merged = - ClusterInfo.update( - clusterInfo, - tp -> - tp.equals(TopicPartition.of(topicName, 0)) - ? Set.of( - Replica.builder() - .topic(topicName) - .partition(0) - .nodeInfo(NodeInfo.of(newBrokerId, "", -1)) - .lag(0) - .size(randomSizeValue) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(true) - .path(replica.path()) - .build()) - : Set.of()); - - Assertions.assertEquals(clusterInfo.replicas().size(), merged.replicas().size()); - Assertions.assertEquals(clusterInfo.topicNames().size(), merged.topicNames().size()); - merged.replicas().forEach(r -> Assertions.assertEquals(randomSizeValue, r.size())); - Assertions.assertEquals(1, merged.replicas(topicName).size()); - Assertions.assertEquals(newBrokerId, merged.replicas(topicName).get(0).nodeInfo().id()); - } - } -} diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java index 5ec3c9866b..38429fc88e 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java @@ -58,62 +58,6 @@ public static ClusterInfo of(List replicas) { replicas); } - @Test - void testReplicaLeadersAndMaskedCluster() { - var replicas = - List.of( - Replica.builder() - .topic("test-1") - .partition(0) - .nodeInfo(NodeInfo.of(0, "", -1)) - .lag(-1) - .size(-1) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-01") - .build(), - Replica.builder() - .topic("test-1") - .partition(1) - .nodeInfo(NodeInfo.of(1, "", -1)) - .lag(-1) - .size(-1) - .isLeader(false) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-02") - .build(), - Replica.builder() - .topic("test-1") - .partition(2) - .nodeInfo(NodeInfo.of(0, "", -1)) - .lag(-1) - .size(-1) - .isLeader(false) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-01") - .build()); - - var clusterInfo = ClusterInfoTest.of(replicas); - var maskedClusterInfoHasReplicas = ClusterInfo.masked(clusterInfo, t -> t.equals("test-1")); - var maskedClusterInfoNoReplicas = - ClusterInfo.masked(clusterInfo, t -> t.equals("No topic name the same.")); - - Assertions.assertNotEquals(0, maskedClusterInfoHasReplicas.nodes().size()); - Assertions.assertNotEquals(0, maskedClusterInfoHasReplicas.replicas().size()); - Assertions.assertEquals(0, maskedClusterInfoNoReplicas.replicas().size()); - - Assertions.assertNotEquals(0, clusterInfo.replicaLeaders(BrokerTopic.of(0, "test-1")).size()); - } - @Test void testEmptyCluster() { var emptyCluster = ClusterInfo.empty(); diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java index 892f6d7aaf..a490d23738 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java @@ -187,13 +187,12 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) .clusterInfo(admin.topicNames(false).toCompletableFuture().join()) .toCompletableFuture() .join(); - var newCluster = ClusterInfo.update(currentCluster, newAllocation::replicas); Assertions.assertEquals( currentCluster.replicas(topic1).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic1).stream() + newAllocation.replicas(topic1).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); @@ -201,7 +200,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) currentCluster.replicas(topic2).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic2).stream() + newAllocation.replicas(topic2).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); @@ -209,7 +208,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) currentCluster.replicas(topic3).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic3).stream() + newAllocation.replicas(topic3).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); From 7ff60ed8be62f27b590030ee65279b454f48a0b7 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 7 Mar 2023 18:11:30 +0800 Subject: [PATCH 10/12] Replace magic number by enum constant --- .../balancer/tweakers/ShuffleTweaker.java | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 1f7a511c94..e78bd7f9a3 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -23,6 +23,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.astraea.common.EnumInfo; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; @@ -83,8 +84,8 @@ public Stream generate(ClusterInfo baseAllocation) { for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) { final var tp = partitionOrder.get(i); if (!eligiblePartition(baseAllocation.replicas(tp))) continue; - switch (ThreadLocalRandom.current().nextInt(0, 2)) { - case 0: + switch (Operation.random()) { + case LEADERSHIP_CHANGE: { // change leader/follower identity var replica = @@ -100,7 +101,7 @@ public Stream generate(ClusterInfo baseAllocation) { } break; } - case 1: + case REPLICA_LIST_CHANGE: { // change replica list var replicaList = baseAllocation.replicas(tp); @@ -149,4 +150,35 @@ private static boolean eligiblePartition(Collection replicas) { r -> r.stream().noneMatch(Replica::isLeader)) .noneMatch(p -> p.test(replicas)); } + + enum Operation implements EnumInfo { + LEADERSHIP_CHANGE, + REPLICA_LIST_CHANGE; + + public static Operation random() { + int random = ThreadLocalRandom.current().nextInt(0, 2); + switch (random) { + case 0: + return LEADERSHIP_CHANGE; + case 1: + return REPLICA_LIST_CHANGE; + default: + throw new RuntimeException("Unexpected value: " + random); + } + } + + public static Operation ofAlias(String alias) { + return EnumInfo.ignoreCaseEnum(Operation.class, alias); + } + + @Override + public String alias() { + return name(); + } + + @Override + public String toString() { + return alias(); + } + } } From 2a340de75d9de309f10aadc7165cd365ed1bfb84 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 7 Mar 2023 21:29:55 +0800 Subject: [PATCH 11/12] Revise --- .../common/balancer/tweakers/ShuffleTweaker.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index e78bd7f9a3..68e37302bf 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -16,7 +16,9 @@ */ package org.astraea.common.balancer.tweakers; +import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; @@ -155,16 +157,11 @@ enum Operation implements EnumInfo { LEADERSHIP_CHANGE, REPLICA_LIST_CHANGE; + private static final List OPERATIONS = + Arrays.stream(Operation.values()).collect(Collectors.toUnmodifiableList()); + public static Operation random() { - int random = ThreadLocalRandom.current().nextInt(0, 2); - switch (random) { - case 0: - return LEADERSHIP_CHANGE; - case 1: - return REPLICA_LIST_CHANGE; - default: - throw new RuntimeException("Unexpected value: " + random); - } + return OPERATIONS.get(ThreadLocalRandom.current().nextInt(OPERATIONS.size())); } public static Operation ofAlias(String alias) { From 5fb0240fdb6a3e628e9ae631f5140fd69fda68ec Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 8 Mar 2023 00:44:11 +0800 Subject: [PATCH 12/12] Address comments --- .../balancer/tweakers/AllocationTweaker.java | 44 ------------------- .../balancer/tweakers/ShuffleTweaker.java | 7 +-- .../balancer/tweakers/ShuffleTweakerTest.java | 22 ++++++---- .../astraea/common/cost/NetworkCostTest.java | 6 ++- 4 files changed, 18 insertions(+), 61 deletions(-) delete mode 100644 common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java deleted file mode 100644 index fd289c5ce8..0000000000 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.balancer.tweakers; - -import java.util.stream.Stream; -import org.astraea.common.admin.ClusterInfo; - -@FunctionalInterface -public interface AllocationTweaker { - - static AllocationTweaker random(int numberOfShuffle) { - return new ShuffleTweaker(() -> numberOfShuffle); - } - - /** - * Given a {@link ClusterInfo}, tweak it by certain implementation specific logic. - * - *

In a nutshell. This function takes a {@link ClusterInfo} and return another modified version - * of the given {@link ClusterInfo}. The caller can use this method for browsing the space of - * possible {@link ClusterInfo}. - * - *

If the implementation find no alternative feasible {@link ClusterInfo}. Then an empty {@link - * Stream} will be returned. We don't encourage the implementation to return the original {@link - * ClusterInfo} as part of the Stream result. Since there is no tweaking occurred. - * - * @param baseAllocation the {@link ClusterInfo} as the base being tweaked. - * @return a {@link Stream} of possible tweaked {@link ClusterInfo}. - */ - Stream generate(ClusterInfo baseAllocation); -} diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 68e37302bf..9bfe965747 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -44,21 +44,16 @@ * replica set before this action) into the replica set. * */ -public class ShuffleTweaker implements AllocationTweaker { +public class ShuffleTweaker { private final Supplier numberOfShuffle; private final Predicate topicFilter; - public ShuffleTweaker(Supplier numberOfShuffle) { - this(numberOfShuffle, (x) -> true); - } - public ShuffleTweaker(Supplier numberOfShuffle, Predicate topicFilter) { this.numberOfShuffle = numberOfShuffle; this.topicFilter = topicFilter; } - @Override public Stream generate(ClusterInfo baseAllocation) { // There is no broker if (baseAllocation.nodes().isEmpty()) return Stream.of(); diff --git a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java index 2f635572a6..f13e26517b 100644 --- a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java @@ -42,7 +42,8 @@ class ShuffleTweakerTest { @Test void testRun() { - final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(100, 10, 10, 3); final var stream = shuffleTweaker.generate(fakeCluster); final var iterator = stream.iterator(); @@ -56,7 +57,7 @@ void testRun() { @ValueSource(ints = {3, 5, 7, 11, 13, 17, 19, 23, 29, 31}) void testMovement(int shuffle) { final var fakeCluster = FakeClusterInfo.of(30, 30, 20, 5); - final var shuffleTweaker = new ShuffleTweaker(() -> shuffle); + final var shuffleTweaker = new ShuffleTweaker(() -> shuffle, (x) -> true); shuffleTweaker .generate(fakeCluster) @@ -78,7 +79,7 @@ void testMovement(int shuffle) { @Test void testNoNodes() { final var fakeCluster = FakeClusterInfo.of(0, 0, 0, 0); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -87,7 +88,7 @@ void testNoNodes() { @Test void testOneNode() { final var fakeCluster = FakeClusterInfo.of(1, 1, 1, 1, 1); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -96,7 +97,7 @@ void testOneNode() { @Test void testNoTopic() { final var fakeCluster = FakeClusterInfo.of(3, 0, 0, 0); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -119,7 +120,8 @@ void performanceTest( // log. // Notice: Stream#limit() will hurt performance. the number here might not reflect the actual // performance. - final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(nodeCount, topicCount, partitionCount, replicaCount); final var size = 1000; @@ -137,7 +139,8 @@ void performanceTest( @Test void parallelStreamWorks() { - final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(10, 20, 10, 3); // generator can do parallel without error. @@ -148,7 +151,8 @@ void parallelStreamWorks() { @Test @Disabled void parallelPerformanceTests() throws InterruptedException { - final var shuffleTweaker = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10)); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(50, 500, 30, 2); final var counter = new LongAdder(); final var forkJoinPool = new ForkJoinPool(ForkJoinPool.getCommonPoolParallelism()); @@ -179,7 +183,7 @@ void parallelPerformanceTests() throws InterruptedException { @Test void testEligiblePartition() { - var shuffleTweaker = new ShuffleTweaker(() -> 100); + var shuffleTweaker = new ShuffleTweaker(() -> 100, (x) -> true); var dataDir = Map.of( 0, Set.of("/a", "/b", "c"), diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 14757e2f58..68c373f94e 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -325,8 +325,10 @@ void testExpectedImprovement(int seed) { var testCase = new LargeTestCase(6, 100, seed); var clusterInfo = testCase.clusterInfo(); var clusterBean = testCase.clusterBean(); - var smallShuffle = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 6)); - var largeShuffle = new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31)); + var smallShuffle = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 6), (x) -> true); + var largeShuffle = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31), (x) -> true); var costFunction = HasClusterCost.of(Map.of(new NetworkIngressCost(), 1.0, new NetworkEgressCost(), 1.0)); var originalCost = costFunction.clusterCost(clusterInfo, clusterBean);