diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index d8d712924a..b7b16d637d 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -25,8 +25,6 @@ import java.util.Optional; import java.util.Set; import java.util.function.BinaryOperator; -import java.util.function.Function; -import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -39,54 +37,6 @@ static ClusterInfo empty() { // ---------------------[helpers]---------------------// - /** Mask specific topics from a {@link ClusterInfo}. */ - static ClusterInfo masked(ClusterInfo clusterInfo, Predicate topicFilter) { - final var nodes = List.copyOf(clusterInfo.nodes()); - final var topics = - clusterInfo.topics().entrySet().stream() - .filter(e -> topicFilter.test(e.getKey())) - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); - final var replicas = - clusterInfo - .replicaStream() - .filter(replica -> topicFilter.test(replica.topic())) - .collect(Collectors.toUnmodifiableList()); - return of(clusterInfo.clusterId(), nodes, topics, replicas); - } - - /** - * Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned - * {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given - * {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in - * the associated cluster. Only the replicas related to the covered topic/partition get updated. - * - *

This method intended to offer a way to describe a cluster with some of its state modified - * manually. - * - * @param clusterInfo to get updated - * @param replacement offers new host and data folder - * @return new cluster info - */ - static ClusterInfo update( - ClusterInfo clusterInfo, Function> replacement) { - var newReplicas = - clusterInfo.replicas().stream() - .collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition()))) - .entrySet() - .stream() - .map( - entry -> { - var replaced = replacement.apply(entry.getKey()); - if (replaced.isEmpty()) return entry.getValue(); - return replaced; - }) - .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableList()); - - return ClusterInfo.of( - clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas); - } - /** * Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log * placement in the given target allocation. Note that the given two allocations must have the @@ -603,6 +553,26 @@ public Map topics() { return topics.get(); } + @Override + public List replicas() { + return all; + } + + @Override + public List replicas(String topic) { + return byTopic.get().getOrDefault(topic, List.of()); + } + + @Override + public List replicas(TopicPartition topicPartition) { + return byPartition.get().getOrDefault(topicPartition, List.of()); + } + + @Override + public List replicas(TopicPartitionReplica replica) { + return byReplica.get().getOrDefault(replica, List.of()); + } + @Override public List replicaLeaders(String topic) { return byTopicForLeader.get().getOrDefault(topic, List.of()); diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 2fdc8208fa..f64f8d65b6 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -19,6 +19,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAccumulator; import java.util.concurrent.atomic.LongAdder; @@ -135,7 +136,9 @@ public GreedyBalancer(Configuration config) { public Plan offer(AlgorithmConfig config) { final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); - final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); + final var allocationTweaker = + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter()); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean); @@ -151,22 +154,19 @@ public Plan offer(AlgorithmConfig config) { .generate(currentAllocation) .takeWhile(ignored -> moreRoom.get()) .map( - newAllocation -> { - var newClusterInfo = - ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost( - currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost( + currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter( plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .findFirst(); var currentCost = initialCost; - var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter()); + var currentAllocation = currentClusterInfo; var currentSolution = Optional.empty(); // register JMX diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index f9c19510ba..591e7f40b7 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -19,9 +19,9 @@ import java.util.Comparator; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; import org.astraea.common.Configuration; import org.astraea.common.Utils; -import org.astraea.common.admin.ClusterInfo; import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.tweakers.ShuffleTweaker; @@ -64,27 +64,26 @@ public SingleStepBalancer(Configuration config) { public Plan offer(AlgorithmConfig config) { final var currentClusterInfo = config.clusterInfo(); final var clusterBean = config.clusterBean(); - final var allocationTweaker = new ShuffleTweaker(minStep, maxStep); + final var allocationTweaker = + new ShuffleTweaker( + () -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter()); final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); final var currentCost = config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean); - final var generatorClusterInfo = ClusterInfo.masked(currentClusterInfo, config.topicFilter()); var start = System.currentTimeMillis(); return allocationTweaker - .generate(generatorClusterInfo) + .generate(currentClusterInfo) .parallel() .limit(iteration) .takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis()) .map( - newAllocation -> { - var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas); - return new Solution( - clusterCostFunction.clusterCost(newClusterInfo, clusterBean), - moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean), - newAllocation); - }) + newAllocation -> + new Solution( + clusterCostFunction.clusterCost(newAllocation, clusterBean), + moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean), + newAllocation)) .filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost())) .filter(plan -> config.movementConstraint().test(plan.moveCost())) .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java deleted file mode 100644 index fd289c5ce8..0000000000 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/AllocationTweaker.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.balancer.tweakers; - -import java.util.stream.Stream; -import org.astraea.common.admin.ClusterInfo; - -@FunctionalInterface -public interface AllocationTweaker { - - static AllocationTweaker random(int numberOfShuffle) { - return new ShuffleTweaker(() -> numberOfShuffle); - } - - /** - * Given a {@link ClusterInfo}, tweak it by certain implementation specific logic. - * - *

In a nutshell. This function takes a {@link ClusterInfo} and return another modified version - * of the given {@link ClusterInfo}. The caller can use this method for browsing the space of - * possible {@link ClusterInfo}. - * - *

If the implementation find no alternative feasible {@link ClusterInfo}. Then an empty {@link - * Stream} will be returned. We don't encourage the implementation to return the original {@link - * ClusterInfo} as part of the Stream result. Since there is no tweaking occurred. - * - * @param baseAllocation the {@link ClusterInfo} as the base being tweaked. - * @return a {@link Stream} of possible tweaked {@link ClusterInfo}. - */ - Stream generate(ClusterInfo baseAllocation); -} diff --git a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java index 33f3b4b625..9bfe965747 100644 --- a/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java +++ b/common/src/main/java/org/astraea/common/balancer/tweakers/ShuffleTweaker.java @@ -16,16 +16,16 @@ */ package org.astraea.common.balancer.tweakers; +import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; +import org.astraea.common.EnumInfo; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; @@ -44,19 +44,16 @@ * replica set before this action) into the replica set. * */ -public class ShuffleTweaker implements AllocationTweaker { +public class ShuffleTweaker { private final Supplier numberOfShuffle; + private final Predicate topicFilter; - public ShuffleTweaker(int origin, int bound) { - this(() -> ThreadLocalRandom.current().nextInt(origin, bound)); - } - - public ShuffleTweaker(Supplier numberOfShuffle) { + public ShuffleTweaker(Supplier numberOfShuffle, Predicate topicFilter) { this.numberOfShuffle = numberOfShuffle; + this.topicFilter = topicFilter; } - @Override public Stream generate(ClusterInfo baseAllocation) { // There is no broker if (baseAllocation.nodes().isEmpty()) return Stream.of(); @@ -72,75 +69,69 @@ public Stream generate(ClusterInfo baseAllocation) { return Stream.generate( () -> { final var shuffleCount = numberOfShuffle.get(); - - var candidates = - IntStream.range(0, shuffleCount) - .mapToObj(i -> allocationGenerator(baseAllocation.brokerFolders())) + final var partitionOrder = + baseAllocation.topicPartitions().stream() + .filter(tp -> topicFilter.test(tp.topic())) + .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) + .sorted(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) .collect(Collectors.toUnmodifiableList()); - var currentAllocation = baseAllocation; - for (var candidate : candidates) currentAllocation = candidate.apply(currentAllocation); - - return currentAllocation; + final var finalCluster = ClusterInfoBuilder.builder(baseAllocation); + for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) { + final var tp = partitionOrder.get(i); + if (!eligiblePartition(baseAllocation.replicas(tp))) continue; + switch (Operation.random()) { + case LEADERSHIP_CHANGE: + { + // change leader/follower identity + var replica = + baseAllocation + .replicaStream(tp) + .filter(Replica::isFollower) + .map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + if (replica.isPresent()) { + finalCluster.setPreferredLeader(replica.get().topicPartitionReplica()); + shuffled++; + } + break; + } + case REPLICA_LIST_CHANGE: + { + // change replica list + var replicaList = baseAllocation.replicas(tp); + var currentIds = + replicaList.stream() + .map(Replica::nodeInfo) + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableSet()); + var broker = + baseAllocation.brokers().stream() + .filter(b -> !currentIds.contains(b.id())) + .map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt())) + .min(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey); + if (broker.isPresent()) { + var replica = randomElement(replicaList); + finalCluster.reassignReplica( + replica.topicPartitionReplica(), + broker.get().id(), + randomElement(baseAllocation.brokerFolders().get(broker.get().id()))); + shuffled++; + } + break; + } + default: + throw new RuntimeException("Unexpected Condition"); + } + } + + return finalCluster.build(); }); } - private static Function allocationGenerator( - Map> brokerFolders) { - return currentAllocation -> { - final var selectedPartition = - currentAllocation.topicPartitions().stream() - .filter(tp -> eligiblePartition((currentAllocation.replicas(tp)))) - .map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt())) - .min(Map.Entry.comparingByValue()) - .map(Map.Entry::getKey) - .orElseThrow(); - - // [valid operation 1] change leader/follower identity - final var currentReplicas = currentAllocation.replicas(selectedPartition); - final var candidates0 = - currentReplicas.stream() - .skip(1) - .map( - follower -> - (Supplier) - () -> - ClusterInfoBuilder.builder(currentAllocation) - .setPreferredLeader(follower.topicPartitionReplica()) - .build()); - - // [valid operation 2] change replica list - final var currentIds = - currentReplicas.stream() - .map(Replica::nodeInfo) - .map(NodeInfo::id) - .collect(Collectors.toUnmodifiableSet()); - final var candidates1 = - brokerFolders.keySet().stream() - .filter(brokerId -> !currentIds.contains(brokerId)) - .flatMap( - toThisBroker -> - currentReplicas.stream() - .map( - replica -> - (Supplier) - () -> { - var toThisDir = - randomElement(brokerFolders.get(toThisBroker)); - return ClusterInfoBuilder.builder(currentAllocation) - .reassignReplica( - replica.topicPartitionReplica(), - toThisBroker, - toThisDir) - .build(); - })); - - return randomElement( - Stream.concat(candidates0, candidates1).collect(Collectors.toUnmodifiableSet())) - .get(); - }; - } - private static T randomElement(Collection collection) { return collection.stream() .skip(ThreadLocalRandom.current().nextInt(0, collection.size())) @@ -156,4 +147,30 @@ private static boolean eligiblePartition(Collection replicas) { r -> r.stream().noneMatch(Replica::isLeader)) .noneMatch(p -> p.test(replicas)); } + + enum Operation implements EnumInfo { + LEADERSHIP_CHANGE, + REPLICA_LIST_CHANGE; + + private static final List OPERATIONS = + Arrays.stream(Operation.values()).collect(Collectors.toUnmodifiableList()); + + public static Operation random() { + return OPERATIONS.get(ThreadLocalRandom.current().nextInt(OPERATIONS.size())); + } + + public static Operation ofAlias(String alias) { + return EnumInfo.ignoreCaseEnum(Operation.class, alias); + } + + @Override + public String alias() { + return name(); + } + + @Override + public String toString() { + return alias(); + } + } } diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java deleted file mode 100644 index cc0bc9e3e8..0000000000 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoIntegratedTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.astraea.common.admin; - -import java.time.Duration; -import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; -import org.astraea.common.Utils; -import org.astraea.common.producer.Producer; -import org.astraea.common.producer.Record; -import org.astraea.it.Service; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -public class ClusterInfoIntegratedTest { - - private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); - - @AfterAll - static void closeService() { - SERVICE.close(); - } - - @Test - void testUpdate() { - var topicName = Utils.randomString(5); - try (var admin = Admin.of(SERVICE.bootstrapServers())) { - admin.creator().topic(topicName).run().toCompletableFuture().join(); - Utils.sleep(Duration.ofSeconds(3)); - - try (var producer = Producer.of(SERVICE.bootstrapServers())) { - IntStream.range(0, 100) - .forEach( - ignored -> - producer.send(Record.builder().topic(topicName).key(new byte[10]).build())); - } - - var clusterInfo = admin.clusterInfo(Set.of(topicName)).toCompletableFuture().join(); - clusterInfo.replicas().forEach(r -> Assertions.assertTrue(r.size() > 0)); - - var replica = clusterInfo.replicas().iterator().next(); - var newBrokerId = - SERVICE.dataFolders().keySet().stream() - .filter(id -> id != replica.nodeInfo().id()) - .findFirst() - .get(); - - var randomSizeValue = ThreadLocalRandom.current().nextInt(); - var merged = - ClusterInfo.update( - clusterInfo, - tp -> - tp.equals(TopicPartition.of(topicName, 0)) - ? Set.of( - Replica.builder() - .topic(topicName) - .partition(0) - .nodeInfo(NodeInfo.of(newBrokerId, "", -1)) - .lag(0) - .size(randomSizeValue) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(true) - .path(replica.path()) - .build()) - : Set.of()); - - Assertions.assertEquals(clusterInfo.replicas().size(), merged.replicas().size()); - Assertions.assertEquals(clusterInfo.topicNames().size(), merged.topicNames().size()); - merged.replicas().forEach(r -> Assertions.assertEquals(randomSizeValue, r.size())); - Assertions.assertEquals(1, merged.replicas(topicName).size()); - Assertions.assertEquals(newBrokerId, merged.replicas(topicName).get(0).nodeInfo().id()); - } - } -} diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java index 23d1d1d908..38429fc88e 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoTest.java @@ -58,62 +58,6 @@ public static ClusterInfo of(List replicas) { replicas); } - @Test - void testReplicaLeadersAndMaskedCluster() { - var replicas = - List.of( - Replica.builder() - .topic("test-1") - .partition(0) - .nodeInfo(NodeInfo.of(0, "", -1)) - .lag(-1) - .size(-1) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-01") - .build(), - Replica.builder() - .topic("test-1") - .partition(1) - .nodeInfo(NodeInfo.of(1, "", -1)) - .lag(-1) - .size(-1) - .isLeader(false) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-02") - .build(), - Replica.builder() - .topic("test-1") - .partition(2) - .nodeInfo(NodeInfo.of(0, "", -1)) - .lag(-1) - .size(-1) - .isLeader(false) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(false) - .path("/data-folder-01") - .build()); - - var clusterInfo = ClusterInfoTest.of(replicas); - var maskedClusterInfoHasReplicas = ClusterInfo.masked(clusterInfo, t -> t.equals("test-1")); - var maskedClusterInfoNoReplicas = - ClusterInfo.masked(clusterInfo, t -> t.equals("No topic name the same.")); - - Assertions.assertNotEquals(0, maskedClusterInfoHasReplicas.nodes().size()); - Assertions.assertNotEquals(0, maskedClusterInfoHasReplicas.replicas().size()); - Assertions.assertEquals(0, maskedClusterInfoNoReplicas.replicas().size()); - - Assertions.assertNotEquals(0, clusterInfo.replicaLeaders(BrokerTopic.of(0, "test-1")).size()); - } - @Test void testEmptyCluster() { var emptyCluster = ClusterInfo.empty(); @@ -148,4 +92,22 @@ void testTopics() { Assertions.assertEquals(topics, cluster.topics().keySet()); Assertions.assertEquals(topics, cluster.topicNames()); } + + @Test + void testReturnCollectionUnmodifiable() { + var cluster = ClusterInfo.empty(); + var replica = + Replica.builder() + .topic("topic") + .partition(0) + .nodeInfo(NodeInfo.of(0, "", -1)) + .path("f") + .buildLeader(); + Assertions.assertThrows(Exception.class, () -> cluster.replicas().add(replica)); + Assertions.assertThrows(Exception.class, () -> cluster.replicas("t").add(replica)); + Assertions.assertThrows( + Exception.class, () -> cluster.replicas(TopicPartition.of("t", 0)).add(replica)); + Assertions.assertThrows( + Exception.class, () -> cluster.replicas(TopicPartitionReplica.of("t", 0, 10)).add(replica)); + } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java index 892f6d7aaf..a490d23738 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerTest.java @@ -187,13 +187,12 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) .clusterInfo(admin.topicNames(false).toCompletableFuture().join()) .toCompletableFuture() .join(); - var newCluster = ClusterInfo.update(currentCluster, newAllocation::replicas); Assertions.assertEquals( currentCluster.replicas(topic1).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic1).stream() + newAllocation.replicas(topic1).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); @@ -201,7 +200,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) currentCluster.replicas(topic2).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic2).stream() + newAllocation.replicas(topic2).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); @@ -209,7 +208,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) currentCluster.replicas(topic3).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), - newCluster.replicas(topic3).stream() + newAllocation.replicas(topic3).stream() .map(Replica::topicPartitionReplica) .collect(Collectors.toSet()), "With filter, only specific topic has been balanced"); diff --git a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java index 07d4112929..f13e26517b 100644 --- a/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java +++ b/common/src/test/java/org/astraea/common/balancer/tweakers/ShuffleTweakerTest.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import org.astraea.common.Utils; @@ -41,7 +42,8 @@ class ShuffleTweakerTest { @Test void testRun() { - final var shuffleTweaker = new ShuffleTweaker(5, 10); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(100, 10, 10, 3); final var stream = shuffleTweaker.generate(fakeCluster); final var iterator = stream.iterator(); @@ -55,7 +57,7 @@ void testRun() { @ValueSource(ints = {3, 5, 7, 11, 13, 17, 19, 23, 29, 31}) void testMovement(int shuffle) { final var fakeCluster = FakeClusterInfo.of(30, 30, 20, 5); - final var shuffleTweaker = new ShuffleTweaker(() -> shuffle); + final var shuffleTweaker = new ShuffleTweaker(() -> shuffle, (x) -> true); shuffleTweaker .generate(fakeCluster) @@ -77,7 +79,7 @@ void testMovement(int shuffle) { @Test void testNoNodes() { final var fakeCluster = FakeClusterInfo.of(0, 0, 0, 0); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -86,7 +88,7 @@ void testNoNodes() { @Test void testOneNode() { final var fakeCluster = FakeClusterInfo.of(1, 1, 1, 1, 1); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -95,7 +97,7 @@ void testOneNode() { @Test void testNoTopic() { final var fakeCluster = FakeClusterInfo.of(3, 0, 0, 0); - final var shuffleTweaker = new ShuffleTweaker(() -> 3); + final var shuffleTweaker = new ShuffleTweaker(() -> 3, (x) -> true); Assertions.assertEquals( 0, (int) shuffleTweaker.generate(fakeCluster).count(), "No possible tweak"); @@ -118,7 +120,8 @@ void performanceTest( // log. // Notice: Stream#limit() will hurt performance. the number here might not reflect the actual // performance. - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(nodeCount, topicCount, partitionCount, replicaCount); final var size = 1000; @@ -136,7 +139,8 @@ void performanceTest( @Test void parallelStreamWorks() { - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(10, 20, 10, 3); // generator can do parallel without error. @@ -147,7 +151,8 @@ void parallelStreamWorks() { @Test @Disabled void parallelPerformanceTests() throws InterruptedException { - final var shuffleTweaker = new ShuffleTweaker(0, 10); + final var shuffleTweaker = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 10), (x) -> true); final var fakeCluster = FakeClusterInfo.of(50, 500, 30, 2); final var counter = new LongAdder(); final var forkJoinPool = new ForkJoinPool(ForkJoinPool.getCommonPoolParallelism()); @@ -178,7 +183,7 @@ void parallelPerformanceTests() throws InterruptedException { @Test void testEligiblePartition() { - var shuffleTweaker = new ShuffleTweaker(() -> 100); + var shuffleTweaker = new ShuffleTweaker(() -> 100, (x) -> true); var dataDir = Map.of( 0, Set.of("/a", "/b", "c"), diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 8cdd81d492..68c373f94e 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -324,8 +325,10 @@ void testExpectedImprovement(int seed) { var testCase = new LargeTestCase(6, 100, seed); var clusterInfo = testCase.clusterInfo(); var clusterBean = testCase.clusterBean(); - var smallShuffle = new ShuffleTweaker(1, 6); - var largeShuffle = new ShuffleTweaker(1, 31); + var smallShuffle = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 6), (x) -> true); + var largeShuffle = + new ShuffleTweaker(() -> ThreadLocalRandom.current().nextInt(1, 31), (x) -> true); var costFunction = HasClusterCost.of(Map.of(new NetworkIngressCost(), 1.0, new NetworkEgressCost(), 1.0)); var originalCost = costFunction.clusterCost(clusterInfo, clusterBean);