Skip to content

Commit

Permalink
[BALANCER] Rename balancing mode (#1786)
Browse files Browse the repository at this point in the history
  • Loading branch information
garyparrot authored May 29, 2023
1 parent 212e5d4 commit b1c9566
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ private BalancerConfigs() {}
* <ul>
* <li><code>balancing</code>: The broker will participate in the load balancing process. The
* replica assignment for this broker is eligible for changes.
* <li><code>demoted</code>: The broker should become empty after the rebalance. This mode
* allows the user to clear all the loadings for certain brokers, enabling a graceful
* removal of those brokers. Note to the balancer implementation: A broker in this mode
* assumes it will be out of service after the balancing is finished. Therefore, when
* evaluating the cluster cost, the brokers to demote should be excluded. However, these
* brokers will be included in the move cost evaluation. Since these brokers are still part
* of the cluster right now, and move cost focusing on the cost associated during the
* ongoing balancing process itself.
* <li><code>clear</code>: The broker should become empty after the rebalance. This mode allows
* the user to clear all the loadings for certain brokers, enabling a graceful removal of
* those brokers. Note to the balancer implementation: A broker in this mode assumes it will
* be out of service after the balancing is finished. Therefore, when evaluating the cluster
* cost, the brokers to clear should be excluded. However, these brokers will be included in
* the move cost evaluation. Since these brokers are still part of the cluster right now,
* and move cost focusing on the cost associated during the ongoing balancing process
* itself.
* <li><code>excluded</code>: The broker will not participate in the load balancing process. The
* replica assignment for this broker is not eligible for changes. It will neither accept
* replicas from other brokers nor reassign replicas to other brokers.
Expand All @@ -64,23 +64,23 @@ private BalancerConfigs() {}
* <h3>Flag Interaction:</h3>
*
* <ol>
* <li>All partitions on the demoting brokers will be compelled to participate in the balancing
* <li>All partitions on the clearing brokers will be compelled to participate in the balancing
* process, regardless of the explicit prohibition specified by the {@link
* BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX} configuration. This exception solely
* applies to partitions located at a demoting broker, while disallowed partitions on
* applies to partitions located at a clearing broker, while disallowed partitions on
* balancing brokers will remain excluded from the balancing decision.
* </ol>
*
* <h3>Limitation:</h3>
*
* <ol>
* <li>Demoting a broker may be infeasible if there are not enough brokers to fit the required
* <li>Clearing a broker may be infeasible if there are not enough brokers to fit the required
* replica factor for a specific partition. This situation is more likely to occur if there
* are many <code>excluded</code> brokers that reject accepting new replicas. If such a case
* is detected, an exception should be raised.
* <li>Any broker with ongoing replica-move-in, replica-move-out, or inter-folder movement
* cannot be the demoting target. An exception will be raised if any of the demoting brokers
* have such ongoing events. *
* cannot be the clearing target. An exception will be raised if any of the clearing brokers
* have such ongoing events.
* </ol>
*/
public static final String BALANCER_BROKER_BALANCING_MODE = "balancer.broker.balancing.mode";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static Map<Integer, BalancingModes> balancingMode(ClusterInfo cluster, St
s ->
switch (s[1]) {
case "balancing" -> BalancingModes.BALANCING;
case "demoted" -> BalancingModes.DEMOTED;
case "clear" -> BalancingModes.CLEAR;
case "excluded" -> BalancingModes.EXCLUDED;
default -> throw new IllegalArgumentException(
"Unsupported balancing mode: " + s[1]);
Expand All @@ -64,10 +64,10 @@ public static Map<Integer, BalancingModes> balancingMode(ClusterInfo cluster, St
}

/** Performs common validness checks to the cluster. */
public static void verifyClearBrokerValidness(ClusterInfo cluster, Predicate<Integer> isDemoted) {
public static void verifyClearBrokerValidness(ClusterInfo cluster, Predicate<Integer> isClear) {
var ongoingEventReplica =
cluster.replicas().stream()
.filter(r -> isDemoted.test(r.broker().id()))
.filter(r -> isClear.test(r.broker().id()))
.filter(r -> r.isAdding() || r.isRemoving() || r.isFuture())
.map(Replica::topicPartitionReplica)
.collect(Collectors.toUnmodifiableSet());
Expand All @@ -78,7 +78,7 @@ public static void verifyClearBrokerValidness(ClusterInfo cluster, Predicate<Int
}

/**
* Move all the replicas at the demoting broker to other allowed brokers. <b>BE CAREFUL, The
* Move all the replicas at the clearing broker to other allowed brokers. <b>BE CAREFUL, The
* implementation made no assumption for MoveCost or ClusterCost of the returned ClusterInfo.</b>
* Be aware of this limitation before using it as the starting point for a solution search. Some
* balancer implementation might have trouble finding answer when starting at a state where the
Expand Down Expand Up @@ -159,7 +159,7 @@ public static void balancerConfigCheck(Configuration configs, Set<String> suppor

public enum BalancingModes implements EnumInfo {
BALANCING,
DEMOTED,
CLEAR,
EXCLUDED;

public static BalancingModes ofAlias(String alias) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,23 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.orElse(""));
final Predicate<Integer> isBalancing =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING;
final Predicate<Integer> isDemoted =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED;
final var hasDemoted =
balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED);
BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted);
final Predicate<Integer> isClearing =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.CLEAR;
final var clearing =
balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.CLEAR);
BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isClearing);

final var currentClusterInfo =
BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing);
BalancerUtils.clearedCluster(config.clusterInfo(), isClearing, isBalancing);
final var clusterBean = config.clusterBean();
final var fixedReplicas =
config
.clusterInfo()
.replicaStream()
// if a topic is not allowed to move, it should be fixed.
// if a topic is not allowed to move, but originally it located on a demoting broker, it
// if a topic is not allowed to move, but originally it located on a clearing broker, it
// is ok to move.
.filter(tpr -> !allowedTopics.test(tpr.topic()) && !isDemoted.test(tpr.broker().id()))
.filter(tpr -> !allowedTopics.test(tpr.topic()) && !isClearing.test(tpr.broker().id()))
.collect(Collectors.toUnmodifiableSet());
final var allocationTweaker =
ShuffleTweaker.builder()
Expand All @@ -186,7 +186,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
final Function<ClusterInfo, ClusterCost> evaluateCost =
(cluster) -> {
final var filteredCluster =
hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster;
clearing ? ClusterInfo.builder(cluster).removeNodes(isClearing).build() : cluster;
return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean);
};
final var initialCost = evaluateCost.apply(currentClusterInfo);
Expand Down Expand Up @@ -243,11 +243,11 @@ public Optional<Plan> offer(AlgorithmConfig config) {
}
return currentSolution.or(
() -> {
// With demotion, the implementation detail start search from a demoted state. It is
// With clearing, the implementation detail start search from a cleared state. It is
// possible
// that the start state is already the ideal answer. In this case, it is directly
// returned.
if (hasDemoted
if (clearing
&& initialCost.value() == 0.0
&& !moveCostFunction
.moveCost(config.clusterInfo(), currentClusterInfo, clusterBean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,23 +88,23 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.orElse(""));
final Predicate<Integer> isBalancing =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING;
final Predicate<Integer> isDemoted =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED;
final var hasDemoted =
balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED);
BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted);
final Predicate<Integer> isClearing =
id -> balancingMode.get(id) == BalancerUtils.BalancingModes.CLEAR;
final var clearing =
balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.CLEAR);
BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isClearing);

final var currentClusterInfo =
BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing);
BalancerUtils.clearedCluster(config.clusterInfo(), isClearing, isBalancing);
final var clusterBean = config.clusterBean();
final var fixedReplicas =
config
.clusterInfo()
.replicaStream()
// if a topic is not allowed to move, it should be fixed.
// if a topic is not allowed to move, but originally it located on a demoting broker, it
// if a topic is not allowed to move, but originally it located on a clearing broker, it
// is ok to move.
.filter(tpr -> !allowedTopics.test(tpr.topic()) && !isDemoted.test(tpr.broker().id()))
.filter(tpr -> !allowedTopics.test(tpr.topic()) && !isClearing.test(tpr.broker().id()))
.collect(Collectors.toUnmodifiableSet());
final var allocationTweaker =
ShuffleTweaker.builder()
Expand All @@ -117,7 +117,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
final Function<ClusterInfo, ClusterCost> evaluateCost =
(cluster) -> {
final var filteredCluster =
hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster;
clearing ? ClusterInfo.builder(cluster).removeNodes(isClearing).build() : cluster;
return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean);
};
final var currentCost = evaluateCost.apply(currentClusterInfo);
Expand All @@ -144,11 +144,11 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.min(Comparator.comparing(plan -> plan.proposalClusterCost().value()))
.or(
() -> {
// With demotion, the implementation detail start search from a demoted state. It is
// With clearing, the implementation detail start search from a cleared state. It is
// possible
// that the start state is already the ideal answer. In this case, it is directly
// returned.
if (hasDemoted
if (clearing
&& currentCost.value() == 0.0
&& !moveCostFunction
.moveCost(config.clusterInfo(), currentClusterInfo, clusterBean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void testBalancingMode() {
}

@Test
public void testBalancingModeDemoted() {
public void testBalancingModeClear() {
final var balancer = Utils.construct(balancerClass, Configuration.EMPTY);
final var cluster = cluster(10, 30, 10, (short) 5);

Expand All @@ -223,7 +223,7 @@ public void testBalancingModeDemoted() {
.clusterCost(decreasingCost())
.timeout(Duration.ofSeconds(2))
.configs(customConfig.raw())
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:demoted")
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:clear")
.build()),
testName);
}
Expand All @@ -237,9 +237,7 @@ public void testBalancingModeDemoted() {
.clusterCost(decreasingCost())
.timeout(Duration.ofSeconds(2))
.configs(customConfig.raw())
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,1:demoted,2:demoted")
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "0:clear,1:clear,2:clear")
.build());
Assertions.assertTrue(plan.isPresent(), testName);
var finalCluster = plan.get().proposal();
Expand Down Expand Up @@ -270,7 +268,7 @@ public void testBalancingModeDemoted() {
.configs(customConfig.raw())
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,1:demoted,2:demoted")
"0:clear,1:clear,2:clear")
.build())
.orElseThrow()
.proposal();
Expand All @@ -292,14 +290,13 @@ public void testBalancingModeDemoted() {
.timeout(Duration.ofSeconds(2))
.configs(customConfig.raw())
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,1:demoted,2:demoted")
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "0:clear,1:clear,2:clear")
.build()));
}

{
var testName =
"[test if allowed topics is used, disallowed partitions on demoted broker will be force to move]";
"[test if allowed topics is used, disallowed partitions on cleared broker will be force to move]";
var base =
ClusterInfo.builder()
.addNode(Set.of(1, 2, 3))
Expand Down Expand Up @@ -330,7 +327,7 @@ public void testBalancingModeDemoted() {
// allow anything other than this topic
.config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!topic).*")
// clear broker 3
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted")
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:clear")
// partition at broker 3 will be forced to move
.build()),
testName);
Expand Down Expand Up @@ -370,7 +367,7 @@ public void testBalancingModeDemoted() {
// clear broker 0
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,"
"0:clear,"
+
// allow broker 1,2,3,4,5,6
"1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing,default:excluded")
Expand Down Expand Up @@ -406,7 +403,7 @@ public void testBalancingModeDemoted() {
// clear broker 0, allow broker 1
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,1:balancing,default:excluded")
"0:clear,1:balancing,default:excluded")
// this will raise an error if a partition has replicas at both 0 and 1. In
// this case, there is no allowed broker to adopt replica from 0, since the
// only allowed broker already has one replica on it. we cannot assign two
Expand Down Expand Up @@ -442,7 +439,7 @@ public void testBalancingModeDemoted() {
// clear broker 0 allow broker 1,2,3,4,5,6
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"0:demoted,"
"0:clear,"
+ "1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing")
.build()),
testName);
Expand All @@ -459,7 +456,7 @@ public void testBalancingModeDemoted() {
// clear broker 1 allow broker 0,2,3,4,5,6,7
.config(
BalancerConfigs.BALANCER_BROKER_BALANCING_MODE,
"1:demoted,"
"1:clear,"
+ "0:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing,"
+ "7:balancing,default:excluded")
// adding/removing/future at 0 not 1, unrelated so no error
Expand All @@ -471,14 +468,14 @@ public void testBalancingModeDemoted() {
{
// Some balancer implementations have such logic flaw:
// 1. The initial state[A] cannot be solution.
// 2. There are brokers that need to be demoted.
// 2. There are brokers that need to be cleared.
// 3. The load on those brokers been redistributed to other brokers. Creating the start
// state[B] for the solution search.
// 4. The start state[B] solution is actually the best solution.
// 5. Balancer think the start state[B] is the initial state[A]. And cannot be a solution(as
// mentioned in 1).
// 6. In fact, the start state[B] doesn't equal to the initial state[A]. Since there is a
// cleaning work performed at step 3.
// clearing work performed at step 3.
// 7. Balancer cannot find any solution that is better than the start state(4) and therefore
// returns no solution.
var testName =
Expand All @@ -497,7 +494,7 @@ public void testBalancingModeDemoted() {
.clusterInfo(testCluster)
.clusterBean(ClusterBean.EMPTY)
.clusterCost(new ReplicaLeaderCost())
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "1:demoted")
.config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "1:clear")
.timeout(Duration.ofSeconds(2))
.build()),
testName);
Expand Down
Loading

0 comments on commit b1c9566

Please sign in to comment.