From 9613d6b82e72bd0533e124f323f44fd2afa8d461 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 22 May 2023 15:46:47 +0800 Subject: [PATCH] [BALANCER] Implement balancer config `balancer.broker.balancing.mode` (#1737) --- .../astraea/app/web/BalancerHandlerTest.java | 30 +- .../common/admin/ClusterInfoBuilder.java | 13 + .../common/balancer/BalancerConfigs.java | 52 ++- .../common/balancer/BalancerUtils.java | 180 +++++++++ .../balancer/algorithms/GreedyBalancer.java | 61 ++- .../algorithms/SingleStepBalancer.java | 66 +++- .../common/admin/ClusterInfoBuilderTest.java | 19 + .../balancer/BalancerConfigTestSuite.java | 363 +++++++++++++++++- .../common/balancer/BalancerUtilsTest.java | 208 ++++++++++ docs/web_server/web_api_balancer_chinese.md | 8 +- 10 files changed, 938 insertions(+), 62 deletions(-) create mode 100644 common/src/main/java/org/astraea/common/balancer/BalancerUtils.java create mode 100644 common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 1e4d5d4085..f6727111cb 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -224,27 +224,19 @@ private static Set createAndProduceTopic( void testBestPlan() { try (var admin = Admin.of(SERVICE.bootstrapServers())) { var currentClusterInfo = - ClusterInfo.of( - "fake", - List.of(NodeInfo.of(10, "host", 22), NodeInfo.of(11, "host", 22)), - Map.of(), - List.of( - Replica.builder() - .topic("topic") - .partition(0) - .nodeInfo(NodeInfo.of(10, "host", 22)) - .lag(0) - .size(100) - .isLeader(true) - .isSync(true) - .isFuture(false) - .isOffline(false) - .isPreferredLeader(true) - .path("/tmp/aa") - .build())); + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders( + Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder")))) + .addTopic("topic", 1, (short) 1) + .build(); HasClusterCost clusterCostFunction = - (clusterInfo, clusterBean) -> () -> clusterInfo == currentClusterInfo ? 100D : 10D; + (clusterInfo, clusterBean) -> + () -> + ClusterInfo.findNonFulfilledAllocation(currentClusterInfo, clusterInfo).isEmpty() + ? 100D + : 10D; HasMoveCost moveCostFunction = HasMoveCost.EMPTY; HasMoveCost failMoveCostFunction = (before, after, clusterBean) -> () -> true; diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java index c06919747a..1aa93e1c40 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfoBuilder.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -92,6 +93,18 @@ public ClusterInfoBuilder addNode(Set brokerIds) { }); } + /** + * Remove specific brokers from the cluster state. + * + * @param toRemove id to remove + * @return this + */ + public ClusterInfoBuilder removeNodes(Predicate toRemove) { + return applyNodes( + (nodes, replicas) -> + nodes.stream().filter(node -> toRemove.negate().test(node.id())).toList()); + } + /** * Add some fake folders to a specific broker. * diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java index 01ebcd26ff..da74c14425 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerConfigs.java @@ -35,9 +35,53 @@ private BalancerConfigs() {} public static final String BALANCER_ALLOWED_TOPICS_REGEX = "balancer.allowed.topics.regex"; /** - * A regular expression indicates which brokers are eligible for moving loading. When specified, a - * broker with an id that doesn't match this expression cannot accept a partition from the other - * broker or move its partition to other brokers. + * This configuration indicates the balancing mode for each broker. + * + *

This configuration requires a string with a series of key-value pairs, each pair is + * separated by a comma, and the key and value are separated by a colon. + * (brokerId_A|"default"):(mode),(brokerId_B):(mode), ... The key indicates the integer id + * for a broker. And the value indicates the balancing mode for the associated broker. When the + * key is a string value "default"(without the double quotes), it indicates the + * associated balancing mode should be the default mode for the rest of the brokers that are not + * addressed in the configuration. By default, all the brokers use "balancing" mode. + * + *

Possible balancing modes

+ * + *
    + *
  • balancing: The broker will participate in the load balancing process. The + * replica assignment for this broker is eligible for changes. + *
  • demoted: The broker should become empty after the rebalance. This mode + * allows the user to clear all the loadings for certain brokers, enabling a graceful + * removal of those brokers. Note to the balancer implementation: A broker in this mode + * assumes it will be out of service after the balancing is finished. Therefore, when + * evaluating the cluster cost, the brokers to demote should be excluded. However, these + * brokers will be included in the move cost evaluation. Since these brokers are still part + * of the cluster right now, and move cost focusing on the cost associated during the + * ongoing balancing process itself. + *
  • excluded: The broker will not participate in the load balancing process. The + * replica assignment for this broker is not eligible for changes. It will neither accept + * replicas from other brokers nor reassign replicas to other brokers. + *
+ * + *

Flag Interaction:

+ * + *
    + *
  1. When this flag is used in conjunction with {@link + * BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX}, if a demoted broker contains partition + * from those forbidden topics, an exception should be raised. + *
+ * + *

Limitation:

+ * + *
    + *
  1. Demoting a broker may be infeasible if there are not enough brokers to fit the required + * replica factor for a specific partition. This situation is more likely to occur if there + * are many excluded brokers that reject accepting new replicas. If such a case + * is detected, an exception should be raised. + *
  2. Any broker with ongoing replica-move-in, replica-move-out, or inter-folder movement + * cannot be the demoting target. An exception will be raised if any of the demoting brokers + * have such ongoing events. * + *
*/ - public static final String BALANCER_ALLOWED_BROKERS_REGEX = "balancer.allowed.brokers.regex"; + public static final String BALANCER_BROKER_BALANCING_MODE = "balancer.broker.balancing.mode"; } diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java b/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java new file mode 100644 index 0000000000..d846905f0b --- /dev/null +++ b/common/src/main/java/org/astraea/common/balancer/BalancerUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import org.astraea.common.EnumInfo; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; +import org.astraea.common.admin.Replica; + +public final class BalancerUtils { + + private BalancerUtils() {} + + public static Map balancingMode(ClusterInfo cluster, String config) { + var num = Pattern.compile("[0-9]+"); + + var map = + Arrays.stream(config.split(",")) + .filter(Predicate.not(String::isEmpty)) + .map(x -> x.split(":")) + .collect( + Collectors.toUnmodifiableMap( + s -> (Object) (num.matcher(s[0]).find() ? Integer.parseInt(s[0]) : s[0]), + s -> + switch (s[1]) { + case "balancing" -> BalancingModes.BALANCING; + case "demoted" -> BalancingModes.DEMOTED; + case "excluded" -> BalancingModes.EXCLUDED; + default -> throw new IllegalArgumentException( + "Unsupported balancing mode: " + s[1]); + })); + + Function mode = + (id) -> map.getOrDefault(id, map.getOrDefault("default", BalancingModes.BALANCING)); + + return cluster.brokers().stream() + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableMap(Function.identity(), mode)); + } + + /** + * Verify there is no logic conflict between {@link BalancerConfigs#BALANCER_ALLOWED_TOPICS_REGEX} + * and {@link BalancerConfigs#BALANCER_BROKER_BALANCING_MODE}. It also performs other common + * validness checks to the cluster. + */ + public static void verifyClearBrokerValidness( + ClusterInfo cluster, Predicate isDemoted, Predicate allowedTopics) { + var disallowedTopicsToClear = + cluster.topicPartitionReplicas().stream() + .filter(tpr -> isDemoted.test(tpr.brokerId())) + .filter(tpr -> !allowedTopics.test(tpr.topic())) + .collect(Collectors.toUnmodifiableSet()); + if (!disallowedTopicsToClear.isEmpty()) + throw new IllegalArgumentException( + "Attempts to clear some brokers, but some of them contain topics that forbidden from being changed due to \"" + + BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX + + "\": " + + disallowedTopicsToClear); + + var ongoingEventReplica = + cluster.replicas().stream() + .filter(r -> isDemoted.test(r.nodeInfo().id())) + .filter(r -> r.isAdding() || r.isRemoving() || r.isFuture()) + .map(Replica::topicPartitionReplica) + .collect(Collectors.toUnmodifiableSet()); + if (!ongoingEventReplica.isEmpty()) + throw new IllegalArgumentException( + "Attempts to clear broker with ongoing migration event (adding/removing/future replica): " + + ongoingEventReplica); + } + + /** + * Move all the replicas at the demoting broker to other allowed brokers. BE CAREFUL, The + * implementation made no assumption for MoveCost or ClusterCost of the returned ClusterInfo. + * Be aware of this limitation before using it as the starting point for a solution search. Some + * balancer implementation might have trouble finding answer when starting at a state where the + * MoveCost is already violated. + */ + public static ClusterInfo clearedCluster( + ClusterInfo initial, Predicate clearBrokers, Predicate allowedBrokers) { + final var allowed = + initial.nodes().stream() + .filter(node -> allowedBrokers.test(node.id())) + .filter(node -> Predicate.not(clearBrokers).test(node.id())) + .collect(Collectors.toUnmodifiableSet()); + final var nextBroker = Stream.generate(() -> allowed).flatMap(Collection::stream).iterator(); + final var nextBrokerFolder = + initial.brokerFolders().entrySet().stream() + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, + x -> Stream.generate(x::getValue).flatMap(Collection::stream).iterator())); + + var trackingReplicaList = + initial.topicPartitions().stream() + .collect( + Collectors.toUnmodifiableMap( + tp -> tp, + tp -> + initial.replicas(tp).stream() + .map(Replica::nodeInfo) + .collect(Collectors.toSet()))); + return ClusterInfo.builder(initial) + .mapLog( + replica -> { + if (!clearBrokers.test(replica.nodeInfo().id())) return replica; + var currentReplicaList = trackingReplicaList.get(replica.topicPartition()); + var broker = + IntStream.range(0, allowed.size()) + .mapToObj(i -> nextBroker.next()) + .filter(b -> !currentReplicaList.contains(b)) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Unable to clear replica " + + replica.topicPartitionReplica() + + " for broker " + + replica.nodeInfo().id() + + ", the allowed destination brokers are " + + allowed.stream() + .map(NodeInfo::id) + .collect(Collectors.toUnmodifiableSet()) + + " but all of them already hosting a replica for this partition. " + + "There is no broker can adopt this replica.")); + var folder = nextBrokerFolder.get(broker.id()).next(); + + // update the tracking list. have to do this to avoid putting two replicas from the + // same tp to one broker. + currentReplicaList.remove(replica.nodeInfo()); + currentReplicaList.add(broker); + + return Replica.builder(replica).nodeInfo(broker).path(folder).build(); + }) + .build(); + } + + public enum BalancingModes implements EnumInfo { + BALANCING, + DEMOTED, + EXCLUDED; + + public static BalancingModes ofAlias(String alias) { + return EnumInfo.ignoreCaseEnum(BalancingModes.class, alias); + } + + @Override + public String alias() { + return name(); + } + + @Override + public String toString() { + return alias(); + } + } +} diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java index 1278d85c56..779ff5a9f8 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/GreedyBalancer.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.DoubleAccumulator; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -32,6 +33,7 @@ import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.BalancerConfigs; +import org.astraea.common.balancer.BalancerUtils; import org.astraea.common.balancer.tweakers.ShuffleTweaker; import org.astraea.common.cost.ClusterCost; import org.astraea.common.metrics.MBeanRegister; @@ -140,26 +142,38 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); - final var allowedBrokers = - config - .balancerConfig() - .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) - .map(Pattern::asMatchPredicate) - .>map( - predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) - .orElse((ignore) -> true); + final var balancingMode = + BalancerUtils.balancingMode( + config.clusterInfo(), + config + .balancerConfig() + .string(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE) + .orElse("")); + final Predicate isBalancing = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING; + final Predicate isDemoted = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED; + final var hasDemoted = + balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED); + BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted, allowedTopics); - final var currentClusterInfo = config.clusterInfo(); + final var currentClusterInfo = + BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing); final var clusterBean = config.clusterBean(); final var allocationTweaker = ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) - .allowedBrokers(allowedBrokers) + .allowedBrokers(isBalancing) .build(); - final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); - final var initialCost = clusterCostFunction.clusterCost(currentClusterInfo, clusterBean); + final Function evaluateCost = + (cluster) -> { + final var filteredCluster = + hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster; + return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean); + }; + final var initialCost = evaluateCost.apply(currentClusterInfo); final var loop = new AtomicInteger(iteration); final var start = System.currentTimeMillis(); @@ -182,7 +196,7 @@ public Optional offer(AlgorithmConfig config) { config.clusterInfo(), initialCost, newAllocation, - clusterCostFunction.clusterCost(newAllocation, clusterBean))) + evaluateCost.apply(newAllocation))) .filter(plan -> plan.proposalClusterCost().value() < currentCost.value()) .findFirst(); var currentCost = initialCost; @@ -211,6 +225,25 @@ public Optional offer(AlgorithmConfig config) { currentCost = currentSolution.get().proposalClusterCost(); currentAllocation = currentSolution.get().proposal(); } - return currentSolution; + return currentSolution.or( + () -> { + // With demotion, the implementation detail start search from a demoted state. It is + // possible + // that the start state is already the ideal answer. In this case, it is directly + // returned. + if (hasDemoted + && initialCost.value() == 0.0 + && !moveCostFunction + .moveCost(config.clusterInfo(), currentClusterInfo, clusterBean) + .overflow()) { + return Optional.of( + new Plan( + config.clusterInfo(), + config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean), + currentClusterInfo, + initialCost)); + } + return Optional.empty(); + }); } } diff --git a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java index 0b8cc0bafe..3f170b369f 100644 --- a/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java +++ b/common/src/main/java/org/astraea/common/balancer/algorithms/SingleStepBalancer.java @@ -21,13 +21,17 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import java.util.function.Predicate; import java.util.regex.Pattern; import org.astraea.common.Utils; +import org.astraea.common.admin.ClusterInfo; import org.astraea.common.balancer.AlgorithmConfig; import org.astraea.common.balancer.Balancer; import org.astraea.common.balancer.BalancerConfigs; +import org.astraea.common.balancer.BalancerUtils; import org.astraea.common.balancer.tweakers.ShuffleTweaker; +import org.astraea.common.cost.ClusterCost; /** This algorithm proposes rebalance plan by tweaking the log allocation once. */ public class SingleStepBalancer implements Balancer { @@ -68,27 +72,39 @@ public Optional offer(AlgorithmConfig config) { .regexString(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX) .map(Pattern::asMatchPredicate) .orElse((ignore) -> true); - final var allowedBrokers = - config - .balancerConfig() - .regexString(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX) - .map(Pattern::asMatchPredicate) - .>map( - predicate -> (brokerId) -> predicate.test(Integer.toString(brokerId))) - .orElse((ignore) -> true); + final var balancingMode = + BalancerUtils.balancingMode( + config.clusterInfo(), + config + .balancerConfig() + .string(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE) + .orElse("")); + final Predicate isBalancing = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.BALANCING; + final Predicate isDemoted = + id -> balancingMode.get(id) == BalancerUtils.BalancingModes.DEMOTED; + final var hasDemoted = + balancingMode.values().stream().anyMatch(i -> i == BalancerUtils.BalancingModes.DEMOTED); + BalancerUtils.verifyClearBrokerValidness(config.clusterInfo(), isDemoted, allowedTopics); - final var currentClusterInfo = config.clusterInfo(); + final var currentClusterInfo = + BalancerUtils.clearedCluster(config.clusterInfo(), isDemoted, isBalancing); final var clusterBean = config.clusterBean(); final var allocationTweaker = ShuffleTweaker.builder() .numberOfShuffle(() -> ThreadLocalRandom.current().nextInt(minStep, maxStep)) .allowedTopics(allowedTopics) - .allowedBrokers(allowedBrokers) + .allowedBrokers(isBalancing) .build(); - final var clusterCostFunction = config.clusterCostFunction(); final var moveCostFunction = config.moveCostFunction(); - final var currentCost = - config.clusterCostFunction().clusterCost(currentClusterInfo, clusterBean); + + final Function evaluateCost = + (cluster) -> { + final var filteredCluster = + hasDemoted ? ClusterInfo.builder(cluster).removeNodes(isDemoted).build() : cluster; + return config.clusterCostFunction().clusterCost(filteredCluster, clusterBean); + }; + final var currentCost = evaluateCost.apply(currentClusterInfo); var start = System.currentTimeMillis(); return allocationTweaker @@ -107,8 +123,28 @@ public Optional offer(AlgorithmConfig config) { config.clusterInfo(), currentCost, newAllocation, - clusterCostFunction.clusterCost(newAllocation, clusterBean))) + evaluateCost.apply(newAllocation))) .filter(plan -> plan.proposalClusterCost().value() < currentCost.value()) - .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())); + .min(Comparator.comparing(plan -> plan.proposalClusterCost().value())) + .or( + () -> { + // With demotion, the implementation detail start search from a demoted state. It is + // possible + // that the start state is already the ideal answer. In this case, it is directly + // returned. + if (hasDemoted + && currentCost.value() == 0.0 + && !moveCostFunction + .moveCost(config.clusterInfo(), currentClusterInfo, clusterBean) + .overflow()) { + return Optional.of( + new Plan( + config.clusterInfo(), + config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean), + currentClusterInfo, + currentCost)); + } + return Optional.empty(); + }); } } diff --git a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java index b7d2851632..0f2e5244d3 100644 --- a/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java +++ b/common/src/test/java/org/astraea/common/admin/ClusterInfoBuilderTest.java @@ -341,4 +341,23 @@ void testFakeBrokerInteraction(int id, String host, int port) { Assertions.assertEquals(node0, node1); Assertions.assertNotEquals(node0, node2); } + + @Test + void testRemoveNodes() { + var base = ClusterInfo.builder().addNode(Set.of(1, 2, 3, 4, 5, 6, 7, 8, 9)).build(); + Assertions.assertEquals( + Set.of(1, 2, 3), + ClusterInfo.builder(base) + .removeNodes(x -> Set.of(4, 5, 6, 7, 8, 9).contains(x)) + .build() + .nodes() + .stream() + .map(NodeInfo::id) + .collect(Collectors.toSet())); + Assertions.assertEquals( + Set.of(1, 3, 5, 7, 9), + ClusterInfo.builder(base).removeNodes(x -> x % 2 == 0).build().nodes().stream() + .map(NodeInfo::id) + .collect(Collectors.toSet())); + } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java index 693c2be146..e3c2e66b35 100644 --- a/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java +++ b/common/src/test/java/org/astraea/common/balancer/BalancerConfigTestSuite.java @@ -17,17 +17,23 @@ package org.astraea.common.balancer; import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.astraea.common.Configuration; import org.astraea.common.Utils; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; import org.astraea.common.cost.ClusterCost; import org.astraea.common.cost.HasClusterCost; +import org.astraea.common.cost.ReplicaLeaderCost; import org.astraea.common.metrics.ClusterBean; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -121,7 +127,7 @@ public void testBalancerAllowedTopicsRegex() { } @Test - public void testBalancerAllowedBrokersRegex() { + public void testBalancingMode() { final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); final var cluster = cluster(10, 10, 10, (short) 5); @@ -134,7 +140,7 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "[0-9]*") + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:balancing") .build()); AssertionsHelper.assertSomeMovement(cluster, plan.orElseThrow().proposal(), testName); } @@ -148,7 +154,7 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, "NoMatch") + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:excluded") .build()); // since nothing can be moved. It is ok to return no plan. if (plan.isPresent()) { @@ -160,8 +166,10 @@ public void testBalancerAllowedBrokersRegex() { { var testName = "[test some match]"; var allowedBrokers = IntStream.range(1, 6).boxed().collect(Collectors.toUnmodifiableSet()); - var rawRegex = - allowedBrokers.stream().map(Object::toString).collect(Collectors.joining("|", "(", ")")); + var config = + allowedBrokers.stream() + .map(i -> i + ":balancing") + .collect(Collectors.joining(",", "default:excluded,", "")); var plan = balancer.offer( AlgorithmConfig.builder() @@ -169,13 +177,345 @@ public void testBalancerAllowedBrokersRegex() { .clusterCost(decreasingCost()) .timeout(Duration.ofSeconds(2)) .configs(customConfig.raw()) - .config(BalancerConfigs.BALANCER_ALLOWED_BROKERS_REGEX, rawRegex) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, config) .build()); AssertionsHelper.assertOnlyAllowedBrokerMovement( cluster, plan.orElseThrow().proposal(), allowedBrokers::contains, testName); } } + @Test + public void testBalancingModeDemoted() { + final var balancer = Utils.construct(balancerClass, Configuration.EMPTY); + final var cluster = cluster(10, 30, 10, (short) 5); + + { + var testName = "[test all clear]"; + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "default:demoted") + .build()), + testName); + } + + { + var testName = "[test some clear]"; + var plan = + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build()); + Assertions.assertTrue(plan.isPresent(), testName); + var finalCluster = plan.get().proposal(); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 0)); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 1)); + Assertions.assertTrue(cluster.replicas().stream().anyMatch(x -> x.nodeInfo().id() == 2)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 0)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 1)); + Assertions.assertTrue( + finalCluster.replicas().stream().noneMatch(x -> x.nodeInfo().id() == 2)); + AssertionsHelper.assertBrokerEmpty( + finalCluster, (x) -> Set.of(0, 1, 2).contains(x), testName); + } + + { + var testName = "[test replication factor violation]"; + // 6 brokers, clear 3 brokers, remain 3 brokers, topic with replication factor 3 can fit this + // cluster. + var noViolatedCluster = cluster(6, 10, 10, (short) 3); + Assertions.assertDoesNotThrow( + () -> { + var solution = + balancer + .offer( + AlgorithmConfig.builder() + .clusterInfo(noViolatedCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build()) + .orElseThrow() + .proposal(); + AssertionsHelper.assertBrokerEmpty( + solution, (x) -> Set.of(0, 1, 2).contains(x), testName); + }, + testName); + + // 5 brokers, clear 3 brokers, remain 2 brokers, topic with replication factor 3 CANNOT fit + // this cluster. + var violatedCluster = cluster(5, 10, 10, (short) 3); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(violatedCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:demoted,2:demoted") + .build())); + } + + { + var testName = "[test if allowed topics is used, clear disallow topic will raise an error]"; + var base = + ClusterInfo.builder() + .addNode(Set.of(1, 2, 3)) + .addFolders( + Map.ofEntries( + Map.entry(1, Set.of("/folder")), + Map.entry(2, Set.of("/folder")), + Map.entry(3, Set.of("/folder")))) + .build(); + var node12 = Stream.of(1, 2).map(base::node).iterator(); + var node13 = Stream.of(1, 3).map(base::node).iterator(); + var node123 = Stream.of(1, 2, 3).map(base::node).iterator(); + var testCluster = + ClusterInfo.builder(base) + .addTopic("OK", 1, (short) 1, r -> Replica.builder(r).nodeInfo(base.node(1)).build()) + .addTopic( + "OK_SKIP", 2, (short) 1, r -> Replica.builder(r).nodeInfo(node12.next()).build()) + .addTopic( + "Replica", 1, (short) 2, r -> Replica.builder(r).nodeInfo(node13.next()).build()) + .addTopic( + "Partition", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(node123.next()).build()) + .build(); + + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "OK" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!OK).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this won't raise an error since that topic didn't locate at 3 + .build()), + testName); + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "OK" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!OK_SKIP).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this won't raise an error since that topic didn't locate at 3 + .build()), + testName); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "Replica" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!Replica).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this will raise an error since that topic has a replica at 3 + .build()), + testName); + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // allow anything other than "Replica" topic + .config(BalancerConfigs.BALANCER_ALLOWED_TOPICS_REGEX, "(?!Partition).*") + // clear broker 3 + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "3:demoted") + // this will raise an error since that topic has a partition at 3 + .build()), + testName); + } + + { + var testName = "[test if allowed brokers is used, disallowed broker won't be altered]"; + var solution = + balancer + .offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // clear broker 0 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted," + + + // allow broker 1,2,3,4,5,6 + "1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing,default:excluded") + // this will be ok since any replica at 0 can move to 1~6 without breaking + // replica factors + .build()) + .orElseThrow() + .proposal(); + var before = cluster.topicPartitionReplicas(); + var after = solution.topicPartitionReplicas(); + var changed = + after.stream() + .filter(Predicate.not(before::contains)) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertTrue(after.stream().noneMatch(r -> r.brokerId() == 0), testName); + Assertions.assertTrue( + changed.stream().allMatch(r -> Set.of(1, 2, 3, 4, 5, 6).contains(r.brokerId())), + testName); + } + + { + var testName = + "[test if allowed brokers is used, insufficient allowed broker to fit replica factor requirement will raise an error]"; + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cluster) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(2)) + .configs(customConfig.raw()) + // clear broker 0, allow broker 1 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted,1:balancing,default:excluded") + // this will raise an error if a partition has replicas at both 0 and 1. In + // this case, there is no allowed broker to adopt replica from 0, since the + // only allowed broker already has one replica on it. we cannot assign two + // replicas to one broker. + .build()), + testName); + } + + { + var testName = "[if replica on clear broker is adding/removing/future, raise an exception]"; + var adding = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isAdding(true).build()) + .build(); + var removing = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isRemoving(true).build()) + .build(); + var future = + ClusterInfo.builder(cluster) + .mapLog(r -> r.nodeInfo().id() != 0 ? r : Replica.builder(r).isFuture(true).build()) + .build(); + for (var cc : List.of(adding, removing, future)) { + Assertions.assertThrows( + Exception.class, + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cc) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(1)) + .configs(customConfig.raw()) + // clear broker 0 allow broker 1,2,3,4,5,6 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "0:demoted," + + "1:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing") + .build()), + testName); + } + for (var cc : List.of(adding, removing, future)) { + Assertions.assertDoesNotThrow( + () -> + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(cc) + .clusterCost(decreasingCost()) + .timeout(Duration.ofSeconds(1)) + .configs(customConfig.raw()) + // clear broker 1 allow broker 0,2,3,4,5,6,7 + .config( + BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, + "1:demoted," + + "0:balancing,2:balancing,3:balancing,4:balancing,5:balancing,6:balancing," + + "7:balancing,default:excluded") + // adding/removing/future at 0 not 1, unrelated so no error + .build()), + testName); + } + } + + { + // Some balancer implementations have such logic flaw: + // 1. The initial state[A] cannot be solution. + // 2. There are brokers that need to be demoted. + // 3. The load on those brokers been redistributed to other brokers. Creating the start + // state[B] for the solution search. + // 4. The start state[B] solution is actually the best solution. + // 5. Balancer think the start state[B] is the initial state[A]. And cannot be a solution(as + // mentioned in 1). + // 6. In fact, the start state[B] doesn't equal to the initial state[A]. Since there is a + // cleaning work performed at step 3. + // 7. Balancer cannot find any solution that is better than the start state(4) and therefore + // returns no solution. + var testName = + "[If the cluster after clear is the best solution, balancer should be able to return it]"; + var testCluster = + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders( + Map.ofEntries(Map.entry(1, Set.of("/folder")), Map.entry(2, Set.of("/folder")))) + .addTopic("topic", 100, (short) 1) + .build(); + Assertions.assertNotEquals( + Optional.empty(), + balancer.offer( + AlgorithmConfig.builder() + .clusterInfo(testCluster) + .clusterBean(ClusterBean.EMPTY) + .clusterCost(new ReplicaLeaderCost()) + .config(BalancerConfigs.BALANCER_BROKER_BALANCING_MODE, "1:demoted") + .timeout(Duration.ofSeconds(2)) + .build()), + testName); + } + } + private static ClusterInfo cluster(int nodes, int topics, int partitions, short replicas) { var builder = ClusterInfo.builder() @@ -258,5 +598,16 @@ static void assertOnlyAllowedBrokerMovement( }); }); } + + static void assertBrokerEmpty(ClusterInfo target, Predicate clearBroker, String name) { + var violated = + target + .replicaStream() + .filter(i -> clearBroker.test(i.nodeInfo().id())) + .collect(Collectors.toUnmodifiableSet()); + Assertions.assertTrue( + violated.isEmpty(), + name + ": the following replica should move to somewhere else " + violated); + } } } diff --git a/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java b/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java new file mode 100644 index 0000000000..bf27b63426 --- /dev/null +++ b/common/src/test/java/org/astraea/common/balancer/BalancerUtilsTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.balancer; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class BalancerUtilsTest { + + @Test + void testBalancingMode() { + var cluster = ClusterInfo.builder().addNode(Set.of(1, 2, 3, 4, 5)).build(); + + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad")); + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "bad:bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,bad:bad")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,bad:bad:bad")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,4:oops")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,4:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,1:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,:")); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.balancingMode(cluster, "1:balancing,2:demoted,3:excluded,::")); + Assertions.assertThrows(Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:")); + Assertions.assertThrows( + Exception.class, () -> BalancerUtils.balancingMode(cluster, "1:balancing,1:balancing")); + + Assertions.assertDoesNotThrow( + () -> BalancerUtils.balancingMode(cluster, "reserved_usage:balancing"), + "Intentionally reserved this usage"); + + Assertions.assertEquals( + BalancerUtils.BalancingModes.BALANCING, + BalancerUtils.balancingMode(cluster, "").get(1), + "default"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.DEMOTED, + BalancerUtils.balancingMode(cluster, "1:demoted").get(1), + "value"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.DEMOTED, + BalancerUtils.balancingMode(cluster, "default:demoted").get(5), + "user defined default"); + Assertions.assertEquals( + BalancerUtils.BalancingModes.EXCLUDED, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded").get(3)); + Assertions.assertEquals( + BalancerUtils.BalancingModes.EXCLUDED, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded").get(4)); + Assertions.assertEquals( + BalancerUtils.BalancingModes.BALANCING, + BalancerUtils.balancingMode(cluster, "3:excluded,4:excluded,1:balancing").get(1)); + Assertions.assertEquals( + Set.of(1, 2, 3, 4, 5), BalancerUtils.balancingMode(cluster, "").keySet()); + } + + @Test + void testVerifyClearBrokerValidness() { + var base = + ClusterInfo.builder() + .addNode(Set.of(1, 2, 3)) + .addFolders( + Map.ofEntries( + Map.entry(1, Set.of("/folder")), + Map.entry(2, Set.of("/folder")), + Map.entry(3, Set.of("/folder")))) + .build(); + var iter = Stream.of(1, 2, 3).map(base::node).iterator(); + var cluster = + ClusterInfo.builder(base) + .addTopic("A", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .addTopic("B", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .addTopic("C", 1, (short) 1, r -> Replica.builder(r).nodeInfo(iter.next()).build()) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 1, t -> !t.equals("A"))); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 2, t -> !t.equals("B"))); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 3, t -> !t.equals("C"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 1, t -> t.equals("A"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 2, t -> t.equals("B"))); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(cluster, id -> id == 3, t -> t.equals("C"))); + + var hasAdding = + ClusterInfo.builder(cluster).mapLog(r -> Replica.builder(r).isAdding(true).build()).build(); + var hasRemoving = + ClusterInfo.builder(cluster) + .mapLog(r -> Replica.builder(r).isRemoving(true).build()) + .build(); + var hasFuture = + ClusterInfo.builder(cluster).mapLog(r -> Replica.builder(r).isFuture(true).build()).build(); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasAdding, x -> true, x -> true)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasRemoving, x -> true, x -> true)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> BalancerUtils.verifyClearBrokerValidness(hasFuture, x -> true, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasAdding, x -> false, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasRemoving, x -> false, x -> true)); + Assertions.assertDoesNotThrow( + () -> BalancerUtils.verifyClearBrokerValidness(hasFuture, x -> false, x -> true)); + } + + @Test + void testClearedCluster() { + var cluster = + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders(Map.of(1, Set.of("/folder"))) + .addFolders(Map.of(2, Set.of("/folder"))) + .addTopic("topic", 100, (short) 2) + .addNode(Set.of(3, 4)) + .addFolders(Map.of(3, Set.of("/folder"))) + .addFolders(Map.of(4, Set.of("/folder"))) + .build(); + Assertions.assertThrows( + Exception.class, + () -> BalancerUtils.clearedCluster(cluster, id -> id == 1 || id == 2, id -> id == 3), + "Insufficient brokers to meet replica factor requirement"); + var clearedCluster = + Assertions.assertDoesNotThrow( + () -> + BalancerUtils.clearedCluster( + cluster, id -> id == 1 || id == 2, id -> id == 3 || id == 4)); + + Assertions.assertEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 1).toList()); + Assertions.assertEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 2).toList()); + Assertions.assertNotEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 3).toList()); + Assertions.assertNotEquals( + List.of(), clearedCluster.replicas().stream().filter(x -> x.nodeInfo().id() == 4).toList()); + + var sameCluster = + Assertions.assertDoesNotThrow( + () -> BalancerUtils.clearedCluster(cluster, id -> false, id -> true)); + Assertions.assertEquals( + Set.of(), + ClusterInfo.findNonFulfilledAllocation(cluster, sameCluster), + "Nothing to clear, nothing to change"); + + var aCluster = + Assertions.assertDoesNotThrow( + () -> BalancerUtils.clearedCluster(cluster, id -> id == 1, id -> id == 3)); + Assertions.assertEquals( + 0, aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 1).count(), "Demoted"); + Assertions.assertEquals( + 100, + aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 2).count(), + "Not allowed or cleared"); + Assertions.assertEquals( + 100, + aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 3).count(), + "Accept replicas broker demoted broker"); + Assertions.assertEquals( + 0, aCluster.replicas().stream().filter(r -> r.nodeInfo().id() == 4).count(), "Not allowed"); + } +} diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 91b044d47a..309f924645 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -26,10 +26,10 @@ POST /balancer `balancerConfig` 是 balancer 實作開放給使用者設定的內部演算法行為參數,我們有針對常用情境的 balancer config 規範出一些固定的名稱, 參數是否支援要看 Balancer 實作本身。當指定的參數不被 balancer 實作支援時,該實作可能會丟出錯誤提示使用者。 -| config key | config value | -|--------------------------------|-------------------------------------------------------------------------------------------------------------------| -| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | -| balancer.allowed.brokers.regex | 一個正則表達式,表達允許進行搬移操作的 broker 編號,當沒有指定的時候,代表沒有任何限制,所有 broker 都可以做負載更動。當有指定時,只有那些 broker 編號有匹配此正則表達式的 broker 能進行負載的更動 | +| config key | config value | +|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| balancer.allowed.topics.regex | 一個正則表達式,表達允許進行搬移操作的 topic 名稱,當沒有指定的時候,代表沒有任何限制,所有 topic 都可以做搬移 | +| balancer.broker.balancing.mode | 這個參數指定每個 Broker 要採取的負載平衡設定,目前擁有的模式包含 `balancing`, `demoted` 和 `excluded`。`balancing` 代表特定節點要參予負載平衡的過程,該節點身上的負載可以被變動。`demoted` 代表特定節點身上的負載必須要全部移除,這個功能能夠協助使用者優雅地下線一個節點。`excluded` 代表特定節點不能夠參予負載平衡的過程,節點不能新增或移除負載。這個參數的格式是一系列的 key/value pair 的字串,每個 pair 之間透過逗號 "," 間隔,而 key/value 之間透過冒號 ":" 間隔,如 `(brokerId_A |"default"):(mode),(brokerId_B):(mode), ...`,其中 `key` 欄位代表這個是描述某 id 節點的設定,而對應的 `value` 欄位則是該節點要套用的負載平衡模式(`balancing`, `demoted` 或 `excluded`),另外 `key` 欄位可以填寫特殊字串 `default`,代表沒有被設定所提及的節點應該使用的負載平衡模式,比如填寫 `default:excluded` 可以使設定沒有提到的節點不參予負載平衡的過程,預設的 `default` 模式是 `balancing`,意即所有設定沒有提到的節點都將參予負載平衡的過程。 | costConfig: