Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BALANCER] Improve balancer-related code execution speed #1544

Merged
70 changes: 20 additions & 50 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -39,54 +37,6 @@ static ClusterInfo empty() {

// ---------------------[helpers]---------------------//

/** Mask specific topics from a {@link ClusterInfo}. */
static ClusterInfo masked(ClusterInfo clusterInfo, Predicate<String> topicFilter) {
final var nodes = List.copyOf(clusterInfo.nodes());
final var topics =
clusterInfo.topics().entrySet().stream()
.filter(e -> topicFilter.test(e.getKey()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
final var replicas =
clusterInfo
.replicaStream()
.filter(replica -> topicFilter.test(replica.topic()))
.collect(Collectors.toUnmodifiableList());
return of(clusterInfo.clusterId(), nodes, topics, replicas);
}

/**
* Update the replicas of ClusterInfo according to the given ClusterLogAllocation. The returned
* {@link ClusterInfo} will have some of its replicas replaced by the replicas inside the given
* {@link ClusterInfo}. Since {@link ClusterInfo} might only cover a subset of topic/partition in
* the associated cluster. Only the replicas related to the covered topic/partition get updated.
*
* <p>This method intended to offer a way to describe a cluster with some of its state modified
* manually.
*
* @param clusterInfo to get updated
* @param replacement offers new host and data folder
* @return new cluster info
*/
static ClusterInfo update(
ClusterInfo clusterInfo, Function<TopicPartition, Collection<Replica>> replacement) {
var newReplicas =
clusterInfo.replicas().stream()
.collect(Collectors.groupingBy(r -> TopicPartition.of(r.topic(), r.partition())))
.entrySet()
.stream()
.map(
entry -> {
var replaced = replacement.apply(entry.getKey());
if (replaced.isEmpty()) return entry.getValue();
return replaced;
})
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableList());

return ClusterInfo.of(
clusterInfo.clusterId(), clusterInfo.nodes(), clusterInfo.topics(), newReplicas);
}

/**
* Find a subset of topic/partitions in the source allocation, that has any non-fulfilled log
* placement in the given target allocation. Note that the given two allocations must have the
Expand Down Expand Up @@ -603,6 +553,26 @@ public Map<String, Topic> topics() {
return topics.get();
}

@Override
public List<Replica> replicas() {
return all;
}

@Override
public List<Replica> replicas(String topic) {
return byTopic.get().getOrDefault(topic, List.of());
}

@Override
public List<Replica> replicas(TopicPartition topicPartition) {
return byPartition.get().getOrDefault(topicPartition, List.of());
}

@Override
public List<Replica> replicas(TopicPartitionReplica replica) {
return byReplica.get().getOrDefault(replica, List.of());
}

@Override
public List<Replica> replicaLeaders(String topic) {
return byTopicForLeader.get().getOrDefault(topic, List.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAccumulator;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -135,7 +136,9 @@ public GreedyBalancer(Configuration config) {
public Plan offer(AlgorithmConfig config) {
final var currentClusterInfo = config.clusterInfo();
final var clusterBean = config.clusterBean();
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var allocationTweaker =
new ShuffleTweaker(
() -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter());
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean);
Expand All @@ -151,22 +154,19 @@ public Plan offer(AlgorithmConfig config) {
.generate(currentAllocation)
.takeWhile(ignored -> moreRoom.get())
.map(
newAllocation -> {
var newClusterInfo =
ClusterInfo.update(currentClusterInfo, newAllocation::replicas);
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(
currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
newAllocation ->
new Solution(
clusterCostFunction.clusterCost(newAllocation, clusterBean),
moveCostFunction.moveCost(
currentClusterInfo, newAllocation, clusterBean),
newAllocation))
.filter(
plan ->
config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
.filter(plan -> config.movementConstraint().test(plan.moveCost()))
.findFirst();
var currentCost = initialCost;
var currentAllocation = ClusterInfo.masked(currentClusterInfo, config.topicFilter());
var currentAllocation = currentClusterInfo;
var currentSolution = Optional.<Solution>empty();

// register JMX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import java.util.Comparator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import org.astraea.common.Configuration;
import org.astraea.common.Utils;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.balancer.AlgorithmConfig;
import org.astraea.common.balancer.Balancer;
import org.astraea.common.balancer.tweakers.ShuffleTweaker;
Expand Down Expand Up @@ -64,27 +64,26 @@ public SingleStepBalancer(Configuration config) {
public Plan offer(AlgorithmConfig config) {
final var currentClusterInfo = config.clusterInfo();
final var clusterBean = config.clusterBean();
final var allocationTweaker = new ShuffleTweaker(minStep, maxStep);
final var allocationTweaker =
new ShuffleTweaker(
() -> ThreadLocalRandom.current().nextInt(minStep, maxStep), config.topicFilter());
final var clusterCostFunction = config.clusterCostFunction();
final var moveCostFunction = config.moveCostFunction();
final var currentCost =
config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean);
final var generatorClusterInfo = ClusterInfo.masked(currentClusterInfo, config.topicFilter());

var start = System.currentTimeMillis();
return allocationTweaker
.generate(generatorClusterInfo)
.generate(currentClusterInfo)
.parallel()
.limit(iteration)
.takeWhile(ignored -> System.currentTimeMillis() - start <= config.timeout().toMillis())
.map(
newAllocation -> {
var newClusterInfo = ClusterInfo.update(currentClusterInfo, newAllocation::replicas);
return new Solution(
clusterCostFunction.clusterCost(newClusterInfo, clusterBean),
moveCostFunction.moveCost(currentClusterInfo, newClusterInfo, clusterBean),
newAllocation);
})
newAllocation ->
new Solution(
clusterCostFunction.clusterCost(newAllocation, clusterBean),
moveCostFunction.moveCost(currentClusterInfo, newAllocation, clusterBean),
newAllocation))
.filter(plan -> config.clusterConstraint().test(currentCost, plan.proposalClusterCost()))
.filter(plan -> config.movementConstraint().test(plan.moveCost()))
.min(Comparator.comparing(plan -> plan.proposalClusterCost().value()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
*/
package org.astraea.common.balancer.tweakers;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.EnumInfo;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.ClusterInfoBuilder;
import org.astraea.common.admin.NodeInfo;
Expand All @@ -47,13 +47,15 @@
public class ShuffleTweaker implements AllocationTweaker {

private final Supplier<Integer> numberOfShuffle;
private final Predicate<String> topicFilter;

public ShuffleTweaker(int origin, int bound) {
this(() -> ThreadLocalRandom.current().nextInt(origin, bound));
public ShuffleTweaker(Supplier<Integer> numberOfShuffle) {
garyparrot marked this conversation as resolved.
Show resolved Hide resolved
this(numberOfShuffle, (x) -> true);
}

public ShuffleTweaker(Supplier<Integer> numberOfShuffle) {
public ShuffleTweaker(Supplier<Integer> numberOfShuffle, Predicate<String> topicFilter) {
garyparrot marked this conversation as resolved.
Show resolved Hide resolved
this.numberOfShuffle = numberOfShuffle;
this.topicFilter = topicFilter;
}

@Override
Expand All @@ -72,75 +74,69 @@ public Stream<ClusterInfo> generate(ClusterInfo baseAllocation) {
return Stream.generate(
() -> {
final var shuffleCount = numberOfShuffle.get();

var candidates =
IntStream.range(0, shuffleCount)
.mapToObj(i -> allocationGenerator(baseAllocation.brokerFolders()))
final var partitionOrder =
baseAllocation.topicPartitions().stream()
.filter(tp -> topicFilter.test(tp.topic()))
.map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt()))
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.collect(Collectors.toUnmodifiableList());

var currentAllocation = baseAllocation;
for (var candidate : candidates) currentAllocation = candidate.apply(currentAllocation);

return currentAllocation;
final var finalCluster = ClusterInfoBuilder.builder(baseAllocation);
for (int i = 0, shuffled = 0; i < partitionOrder.size() && shuffled < shuffleCount; i++) {
final var tp = partitionOrder.get(i);
if (!eligiblePartition(baseAllocation.replicas(tp))) continue;
switch (Operation.random()) {
case LEADERSHIP_CHANGE:
{
// change leader/follower identity
var replica =
baseAllocation
.replicaStream(tp)
.filter(Replica::isFollower)
.map(r -> Map.entry(r, ThreadLocalRandom.current().nextInt()))
.min(Map.Entry.comparingByValue())
.map(Map.Entry::getKey);
if (replica.isPresent()) {
finalCluster.setPreferredLeader(replica.get().topicPartitionReplica());
shuffled++;
}
break;
}
case REPLICA_LIST_CHANGE:
{
// change replica list
var replicaList = baseAllocation.replicas(tp);
var currentIds =
replicaList.stream()
.map(Replica::nodeInfo)
.map(NodeInfo::id)
.collect(Collectors.toUnmodifiableSet());
var broker =
baseAllocation.brokers().stream()
.filter(b -> !currentIds.contains(b.id()))
.map(b -> Map.entry(b, ThreadLocalRandom.current().nextInt()))
.min(Map.Entry.comparingByValue())
.map(Map.Entry::getKey);
if (broker.isPresent()) {
var replica = randomElement(replicaList);
finalCluster.reassignReplica(
replica.topicPartitionReplica(),
broker.get().id(),
randomElement(baseAllocation.brokerFolders().get(broker.get().id())));
shuffled++;
}
break;
}
default:
throw new RuntimeException("Unexpected Condition");
}
}

return finalCluster.build();
});
}

private static Function<ClusterInfo, ClusterInfo> allocationGenerator(
Map<Integer, Set<String>> brokerFolders) {
return currentAllocation -> {
final var selectedPartition =
currentAllocation.topicPartitions().stream()
.filter(tp -> eligiblePartition((currentAllocation.replicas(tp))))
.map(tp -> Map.entry(tp, ThreadLocalRandom.current().nextInt()))
.min(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElseThrow();

// [valid operation 1] change leader/follower identity
final var currentReplicas = currentAllocation.replicas(selectedPartition);
final var candidates0 =
currentReplicas.stream()
.skip(1)
.map(
follower ->
(Supplier<ClusterInfo>)
() ->
ClusterInfoBuilder.builder(currentAllocation)
.setPreferredLeader(follower.topicPartitionReplica())
.build());

// [valid operation 2] change replica list
final var currentIds =
currentReplicas.stream()
.map(Replica::nodeInfo)
.map(NodeInfo::id)
.collect(Collectors.toUnmodifiableSet());
final var candidates1 =
brokerFolders.keySet().stream()
.filter(brokerId -> !currentIds.contains(brokerId))
.flatMap(
toThisBroker ->
currentReplicas.stream()
.map(
replica ->
(Supplier<ClusterInfo>)
() -> {
var toThisDir =
randomElement(brokerFolders.get(toThisBroker));
return ClusterInfoBuilder.builder(currentAllocation)
.reassignReplica(
replica.topicPartitionReplica(),
toThisBroker,
toThisDir)
.build();
}));

return randomElement(
Stream.concat(candidates0, candidates1).collect(Collectors.toUnmodifiableSet()))
.get();
};
}

private static <T> T randomElement(Collection<T> collection) {
return collection.stream()
.skip(ThreadLocalRandom.current().nextInt(0, collection.size()))
Expand All @@ -156,4 +152,30 @@ private static boolean eligiblePartition(Collection<Replica> replicas) {
r -> r.stream().noneMatch(Replica::isLeader))
.noneMatch(p -> p.test(replicas));
}

enum Operation implements EnumInfo {
LEADERSHIP_CHANGE,
REPLICA_LIST_CHANGE;

private static final List<Operation> OPERATIONS =
Arrays.stream(Operation.values()).collect(Collectors.toUnmodifiableList());

public static Operation random() {
return OPERATIONS.get(ThreadLocalRandom.current().nextInt(OPERATIONS.size()));
}

public static Operation ofAlias(String alias) {
return EnumInfo.ignoreCaseEnum(Operation.class, alias);
}

@Override
public String alias() {
return name();
}

@Override
public String toString() {
return alias();
}
}
}
Loading