Skip to content

Commit

Permalink
[BALANCER] Implement balancer config balancer.broker.balancing.mode (
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored May 22, 2023
1 parent 6db827e commit 9613d6b
Show file tree
Hide file tree
Showing 10 changed files with 938 additions and 62 deletions.
30 changes: 11 additions & 19 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,19 @@ private static Set<String> createAndProduceTopic(
void testBestPlan() {
try (var admin = Admin.of(SERVICE.bootstrapServers())) {
var currentClusterInfo =
ClusterInfo.of(
"fake",
List.of(NodeInfo.of(10, "host", 22), NodeInfo.of(11, "host", 22)),
Map.of(),
List.of(
Replica.builder()
.topic("topic")
.partition(0)
.nodeInfo(NodeInfo.of(10, "host", 22))
.lag(0)
.size(100)
.isLeader(true)
.isSync(true)
.isFuture(false)
.isOffline(false)
.isPreferredLeader(true)
.path("/tmp/aa")
.build()));
ClusterInfo.builder()
.addNode(Set.of(1, 2))
.addFolders(
Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder"))))
.addTopic("topic", 1, (short) 1)
.build();

HasClusterCost clusterCostFunction =
(clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D;
(clusterInfo, clusterBean) ->
() ->
ClusterInfo.findNonFulfilledAllocation(currentClusterInfo, clusterInfo).isEmpty()
? 100D
: 10D;
HasMoveCost moveCostFunction = HasMoveCost.EMPTY;
HasMoveCost failMoveCostFunction = (before, after, clusterBean) -> () -> true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -92,6 +93,18 @@ public ClusterInfoBuilder addNode(Set<Integer> brokerIds) {
});
}

/**
* Remove specific brokers from the cluster state.
*
* @param toRemove id to remove
* @return this
*/
public ClusterInfoBuilder removeNodes(Predicate<Integer> toRemove) {
return applyNodes(
(nodes, replicas) ->
nodes.stream().filter(node -> toRemove.negate().test(node.id())).toList());
}

/**
* Add some fake folders to a specific broker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,53 @@ private BalancerConfigs() {}
public static final String BALANCER_ALLOWED_TOPICS_REGEX = "balancer.allowed.topics.regex";

/**
* A regular expression indicates which brokers are eligible for moving loading. When specified, a
* broker with an id that doesn't match this expression cannot accept a partition from the other
* broker or move its partition to other brokers.
* This configuration indicates the balancing mode for each broker.
*
* <p>This configuration requires a string with a series of key-value pairs, each pair is
* separated by a comma, and the key and value are separated by a colon. <code>
* (brokerId_A|"default"):(mode),(brokerId_B):(mode), ...</code> The key indicates the integer id
* for a broker. And the value indicates the balancing mode for the associated broker. When the
* key is a string value <code>"default"</code>(without the double quotes), it indicates the
* associated balancing mode should be the default mode for the rest of the brokers that are not
* addressed in the configuration. By default, all the brokers use <code> "balancing"</code> mode.
*
* <h3>Possible balancing modes</h3>
*
* <ul>
* <li><code>balancing</code>: The broker will participate in the load balancing process. The
* replica assignment for this broker is eligible for changes.
* <li><code>demoted</code>: The broker should become empty after the rebalance. This mode
* allows the user to clear all the loadings for certain brokers, enabling a graceful
* removal of those brokers. Note to the balancer implementation: A broker in this mode
* assumes it will be out of service after the balancing is finished. Therefore, when
* evaluating the cluster cost, the brokers to demote should be excluded. However, these
* brokers will be included in the move cost evaluation. Since these brokers are still part
* of the cluster right now, and move cost focusing on the cost associated during the
* ongoing balancing process itself.
* <li><code>excluded</code>: The broker will not participate in the load balancing process. The
* replica assignment for this broker is not eligible for changes. It will neither accept
* replicas from other brokers nor reassign replicas to other brokers.
* </ul>
*
* <h3>Flag Interaction:</h3>
*
* <ol>
* <li>When this flag is used in conjunction with {@link
* BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}, if a demoted broker contains partition
* from those forbidden topics, an exception should be raised.
* </ol>
*
* <h3>Limitation:</h3>
*
* <ol>
* <li>Demoting a broker may be infeasible if there are not enough brokers to fit the required
* replica factor for a specific partition. This situation is more likely to occur if there
* are many <code>excluded</code> brokers that reject accepting new replicas. If such a case
* is detected, an exception should be raised.
* <li>Any broker with ongoing replica-move-in, replica-move-out, or inter-folder movement
* cannot be the demoting target. An exception will be raised if any of the demoting brokers
* have such ongoing events. *
* </ol>
*/
public static final String BALANCER_ALLOWED_BROKERS_REGEX = "balancer.allowed.brokers.regex";
public static final String BALANCER_BROKER_BALANCING_MODE = "balancer.broker.balancing.mode";
}
180 changes: 180 additions & 0 deletions common/src/main/java/org/astraea/common/balancer/BalancerUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.balancer;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.astraea.common.EnumInfo;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;

public final class BalancerUtils {

private BalancerUtils() {}

public static Map<Integer, BalancingModes> balancingMode(ClusterInfo cluster, String config) {
var num = Pattern.compile("[0-9]+");

var map =
Arrays.stream(config.split(","))
.filter(Predicate.not(String::isEmpty))
.map(x -> x.split(":"))
.collect(
Collectors.toUnmodifiableMap(
s -> (Object) (num.matcher(s[0]).find() ? Integer.parseInt(s[0]) : s[0]),
s ->
switch (s[1]) {
case "balancing" -> BalancingModes.BALANCING;
case "demoted" -> BalancingModes.DEMOTED;
case "excluded" -> BalancingModes.EXCLUDED;
default -> throw new IllegalArgumentException(
"Unsupported balancing mode: " + s[1]);
}));

Function<Integer, BalancingModes> mode =
(id) -> map.getOrDefault(id, map.getOrDefault("default", BalancingModes.BALANCING));

return cluster.brokers().stream()
.map(NodeInfo::id)
.collect(Collectors.toUnmodifiableMap(Function.identity(), mode));
}

/**
* Verify there is no logic conflict between {@link BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}
* and {@link BalancerConfigs#BALANCER_BROKER_BALANCING_MODE}. It also performs other common
* validness checks to the cluster.
*/
public static void verifyClearBrokerValidness(
ClusterInfo cluster, Predicate<Integer> isDemoted, Predicate<String> allowedTopics) {
var disallowedTopicsToClear =
cluster.topicPartitionReplicas().stream()
.filter(tpr -> isDemoted.test(tpr.brokerId()))
.filter(tpr -> !allowedTopics.test(tpr.topic()))
.collect(Collectors.toUnmodifiableSet());
if (!disallowedTopicsToClear.isEmpty())
throw new IllegalArgumentException(
"Attempts to clear some brokers, but some of them contain topics that forbidden from being changed due to \""
+ BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX
+ "\": "
+ disallowedTopicsToClear);

var ongoingEventReplica =
cluster.replicas().stream()
.filter(r -> isDemoted.test(r.nodeInfo().id()))
.filter(r -> r.isAdding() || r.isRemoving() || r.isFuture())
.map(Replica::topicPartitionReplica)
.collect(Collectors.toUnmodifiableSet());
if (!ongoingEventReplica.isEmpty())
throw new IllegalArgumentException(
"Attempts to clear broker with ongoing migration event (adding/removing/future replica): "
+ ongoingEventReplica);
}

/**
* Move all the replicas at the demoting broker to other allowed brokers. <b>BE CAREFUL, The
* implementation made no assumption for MoveCost or ClusterCost of the returned ClusterInfo.</b>
* Be aware of this limitation before using it as the starting point for a solution search. Some
* balancer implementation might have trouble finding answer when starting at a state where the
* MoveCost is already violated.
*/
public static ClusterInfo clearedCluster(
ClusterInfo initial, Predicate<Integer> clearBrokers, Predicate<Integer> allowedBrokers) {
final var allowed =
initial.nodes().stream()
.filter(node -> allowedBrokers.test(node.id()))
.filter(node -> Predicate.not(clearBrokers).test(node.id()))
.collect(Collectors.toUnmodifiableSet());
final var nextBroker = Stream.generate(() -> allowed).flatMap(Collection::stream).iterator();
final var nextBrokerFolder =
initial.brokerFolders().entrySet().stream()
.collect(
Collectors.toUnmodifiableMap(
Map.Entry::getKey,
x -> Stream.generate(x::getValue).flatMap(Collection::stream).iterator()));

var trackingReplicaList =
initial.topicPartitions().stream()
.collect(
Collectors.toUnmodifiableMap(
tp -> tp,
tp ->
initial.replicas(tp).stream()
.map(Replica::nodeInfo)
.collect(Collectors.toSet())));
return ClusterInfo.builder(initial)
.mapLog(
replica -> {
if (!clearBrokers.test(replica.nodeInfo().id())) return replica;
var currentReplicaList = trackingReplicaList.get(replica.topicPartition());
var broker =
IntStream.range(0, allowed.size())
.mapToObj(i -> nextBroker.next())
.filter(b -> !currentReplicaList.contains(b))
.findFirst()
.orElseThrow(
() ->
new IllegalStateException(
"Unable to clear replica "
+ replica.topicPartitionReplica()
+ " for broker "
+ replica.nodeInfo().id()
+ ", the allowed destination brokers are "
+ allowed.stream()
.map(NodeInfo::id)
.collect(Collectors.toUnmodifiableSet())
+ " but all of them already hosting a replica for this partition. "
+ "There is no broker can adopt this replica."));
var folder = nextBrokerFolder.get(broker.id()).next();

// update the tracking list. have to do this to avoid putting two replicas from the
// same tp to one broker.
currentReplicaList.remove(replica.nodeInfo());
currentReplicaList.add(broker);

return Replica.builder(replica).nodeInfo(broker).path(folder).build();
})
.build();
}

public enum BalancingModes implements EnumInfo {
BALANCING,
DEMOTED,
EXCLUDED;

public static BalancingModes ofAlias(String alias) {
return EnumInfo.ignoreCaseEnum(BalancingModes.class, alias);
}

@Override
public String alias() {
return name();
}

@Override
public String toString() {
return alias();
}
}
}
Loading

0 comments on commit 9613d6b

Please sign in to comment.