Skip to content

Commit

Permalink
[BALANCER] Improve balancer-related code execution speed (#1544)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored Mar 8, 2023
1 parent 10d5424 commit 98e6ec0
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 354 deletions.
70 changes: 20 additions & 50 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -39,54 +37,6 @@ static ClusterInfo empty() {

// ---------------------[helpers]---------------------//

/** Mask specific topics from a {@link ClusterInfo}. */
static ClusterInfo masked(ClusterInfo clusterInfo, Predicate<String> topicFilter) {
final var nodes = List.copyOf(clusterInfo.nodes());
final var topics =
clusterInfo.topics().entrySet().stream()
.filter(e -> topicFilter.test(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
final var replicas =
clusterInfo
.replicaStream()
.filter(replica -> topicFilter.test(replica.topic()))
.collect(Collectors.toUnmodifiableList());
return of(clusterInfo.clusterId(), nodes, topics, replicas);
}

/**
* Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned
* {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given
* {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in
* the associated cluster. Only the replicas related to the covered topic/partition get updated.
*
* <p>This method intended to offer a way to describe a cluster with some of its state modified
* manually.
*
* @param clusterInfo to get updated
* @param replacement offers new host and data folder
* @return new cluster info
*/
static ClusterInfo update(
ClusterInfo clusterInfo, Function<TopicPartition, Collection<Replica>> replacement) {
var newReplicas =
clusterInfo.replicas().stream()
.collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition())))
.entrySet()
.stream()
.map(
entry -> {
var replaced = replacement.apply(entry.getKey());
if (replaced.isEmpty()) return entry.getValue();
return replaced;
})
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList());

return ClusterInfo.of(
clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas);
}

/**
* Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log
* placement in the given target allocation. Note that the given two allocations must have the
Expand Down Expand Up @@ -603,6 +553,26 @@ public Map<String, Topic> topics() {
return topics.get();
}

@Override
public List<Replica> replicas() {
return all;
}

@Override
public List<Replica> replicas(String topic) {
return byTopic.get().getOrDefault(topic, List.of());
}

@Override
public List<Replica> replicas(TopicPartition topicPartition) {
return byPartition.get().getOrDefault(topicPartition, List.of());
}

@Override
public List<Replica> replicas(TopicPartitionReplica replica) {
return byReplica.get().getOrDefault(replica, List.of());
}

@Override
public List<Replica> replicaLeaders(String topic) {
return byTopicForLeader.get().getOrDefault(topic, List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAccumulator;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -135,7 +136,9 @@ public GreedyBalancer(Configuration config) {
public Plan offer(AlgorithmConfig config) {
final var currentClusterInfo = config.clusterInfo();
final var clusterBean = config.clusterBean();
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var allocationTweaker =
new ShuffleTweaker(
() -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter());
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean);
Expand All @@ -151,22 +154,19 @@ public Plan offer(AlgorithmConfig config) {
.generate(currentAllocation)
.takeWhile(ignored -> moreRoom.get())
.map(
newAllocation -> {
var newClusterInfo =
ClusterInfo.update(currentClusterInfo, newAllocation::replicas);
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(
currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
newAllocation ->
new Solution(
clusterCostFunction.clusterCost(newAllocation, clusterBean),
moveCostFunction.moveCost(
currentClusterInfo, newAllocation, clusterBean),
newAllocation))
.filter(
plan ->
config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
.filter(plan -> config.movementConstraint().test(plan.moveCost()))
.findFirst();
var currentCost = initialCost;
var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter());
var currentAllocation = currentClusterInfo;
var currentSolution = Optional.<Solution>empty();

// register JMX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.balancer.AlgorithmConfig;
import org.astraea.common.balancer.Balancer;
import org.astraea.common.balancer.tweakers.ShuffleTweaker;
Expand Down Expand Up @@ -64,27 +64,26 @@ public SingleStepBalancer(Configuration config) {
public Plan offer(AlgorithmConfig config) {
final var currentClusterInfo = config.clusterInfo();
final var clusterBean = config.clusterBean();
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var allocationTweaker =
new ShuffleTweaker(
() -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter());
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var currentCost =
config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean);
final var generatorClusterInfo = ClusterInfo.masked(currentClusterInfo, config.topicFilter());

var start = System.currentTimeMillis();
return allocationTweaker
.generate(generatorClusterInfo)
.generate(currentClusterInfo)
.parallel()
.limit(iteration)
.takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis())
.map(
newAllocation -> {
var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas);
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
newAllocation ->
new Solution(
clusterCostFunction.clusterCost(newAllocation, clusterBean),
moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean),
newAllocation))
.filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
.filter(plan -> config.movementConstraint().test(plan.moveCost()))
.min(Comparator.comparing(plan -> plan.proposalClusterCost().value()))
Expand Down

This file was deleted.

Loading

0 comments on commit 98e6ec0

Please sign in to comment.