diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 01ae8544fe..27d3d348a3 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -21,7 +21,8 @@ import static org.astraea.common.balancer.BalancerConsole.TaskPhase.ExecutionFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.SearchFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched; -import static org.astraea.common.cost.MigrationCost.CHANGED_LEADERS; +import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_ADDED; +import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_REMOVE; import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES; import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES; @@ -336,19 +337,11 @@ void testMoveCost(String leaderLimit, String sizeLimit) { migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum() <= DataSize.of(sizeLimit).bytes()); break; - case CHANGED_LEADERS: + case REPLICA_LEADERS_TO_ADDED: + case REPLICA_LEADERS_TO_REMOVE: Assertions.assertTrue( - Math.max( - migrationCost.brokerCosts.values().stream() - .filter(x -> x >= 0) - .mapToLong(Long::byteValue) - .sum(), - migrationCost.brokerCosts.values().stream() - .filter(x -> x < 0) - .map(Math::abs) - .mapToLong(Long::byteValue) - .sum()) - <= Integer.parseInt(leaderLimit)); + migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum() + <= Long.parseLong(leaderLimit)); break; } }); diff --git a/common/src/main/java/org/astraea/common/cost/MigrationCost.java b/common/src/main/java/org/astraea/common/cost/MigrationCost.java index 764abf2d83..b3cdafddec 100644 --- a/common/src/main/java/org/astraea/common/cost/MigrationCost.java +++ b/common/src/main/java/org/astraea/common/cost/MigrationCost.java @@ -33,19 +33,22 @@ public class MigrationCost { public final Map brokerCosts; public static final String TO_SYNC_BYTES = "record size to sync (bytes)"; public static final String TO_FETCH_BYTES = "record size to fetch (bytes)"; + public static final String REPLICA_LEADERS_TO_ADDED = "leader number to add"; + public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove"; public static final String CHANGED_REPLICAS = "changed replicas"; - public static final String CHANGED_LEADERS = "changed leaders"; public static List migrationCosts(ClusterInfo before, ClusterInfo after) { var migrateInBytes = recordSizeToSync(before, after); var migrateOutBytes = recordSizeToFetch(before, after); var migrateReplicaNum = replicaNumChanged(before, after); - var migrateReplicaLeader = replicaLeaderChanged(before, after); + var migrateInLeader = replicaLeaderToAdd(before, after); + var migrateOutLeader = replicaLeaderToRemove(before, after); return List.of( new MigrationCost(TO_SYNC_BYTES, migrateInBytes), new MigrationCost(TO_FETCH_BYTES, migrateOutBytes), - new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum), - new MigrationCost(CHANGED_LEADERS, migrateReplicaLeader)); + new MigrationCost(REPLICA_LEADERS_TO_ADDED, migrateInLeader), + new MigrationCost(REPLICA_LEADERS_TO_REMOVE, migrateOutLeader), + new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum)); } public MigrationCost(String name, Map brokerCosts) { @@ -54,35 +57,49 @@ public MigrationCost(String name, Map brokerCosts) { } static Map recordSizeToFetch(ClusterInfo before, ClusterInfo after) { - return changedRecordSize(before, after, true); + return migratedChanged(before, after, true, (ignore) -> true, Replica::size); } static Map recordSizeToSync(ClusterInfo before, ClusterInfo after) { - return changedRecordSize(before, after, false); + return migratedChanged(before, after, false, (ignore) -> true, Replica::size); } static Map replicaNumChanged(ClusterInfo before, ClusterInfo after) { - return changedReplicaNumber(before, after, ignore -> true); + return changedReplicaNumber(before, after); } - static Map replicaLeaderChanged(ClusterInfo before, ClusterInfo after) { - return changedReplicaNumber(before, after, Replica::isLeader); + static Map replicaLeaderToAdd(ClusterInfo before, ClusterInfo after) { + return migratedChanged(before, after, true, Replica::isLeader, ignore -> 1L); + } + + static Map replicaLeaderToRemove(ClusterInfo before, ClusterInfo after) { + return migratedChanged(before, after, false, Replica::isLeader, ignore -> 1L); } /** * @param before the ClusterInfo before migrated replicas * @param after the ClusterInfo after migrated replicas * @param migrateOut if data log need fetch from replica leader, set this true + * @param predicate used to filter replicas + * @param replicaFunction decide what information you want to calculate for the replica * @return the data size to migrated by all brokers */ - private static Map changedRecordSize( - ClusterInfo before, ClusterInfo after, boolean migrateOut) { + private static Map migratedChanged( + ClusterInfo before, + ClusterInfo after, + boolean migrateOut, + Predicate predicate, + Function replicaFunction) { var source = migrateOut ? after : before; var dest = migrateOut ? before : after; var changePartitions = ClusterInfo.findNonFulfilledAllocation(source, dest); var cost = changePartitions.stream() - .flatMap(p -> dest.replicas(p).stream().filter(r -> !source.replicas(p).contains(r))) + .flatMap( + p -> + dest.replicas(p).stream() + .filter(predicate) + .filter(r -> !source.replicas(p).contains(r))) .map( r -> { if (migrateOut) return dest.replicaLeader(r.topicPartition()).orElse(r); @@ -92,7 +109,7 @@ private static Map changedRecordSize( Collectors.groupingBy( r -> r.nodeInfo().id(), Collectors.mapping( - Function.identity(), Collectors.summingLong(Replica::size)))); + Function.identity(), Collectors.summingLong(replicaFunction::apply)))); return Stream.concat(dest.nodes().stream(), source.nodes().stream()) .map(NodeInfo::id) .distinct() @@ -133,8 +150,7 @@ static boolean changedRecordSizeOverflow( return Math.max(totalRemovedSize, totalAddedSize) > limit; } - private static Map changedReplicaNumber( - ClusterInfo before, ClusterInfo after, Predicate predicate) { + private static Map changedReplicaNumber(ClusterInfo before, ClusterInfo after) { return Stream.concat(before.nodes().stream(), after.nodes().stream()) .map(NodeInfo::id) .distinct() @@ -146,22 +162,22 @@ private static Map changedReplicaNumber( var removedLeaders = before .replicaStream(id) - .filter(predicate) .filter( r -> after .replicaStream(r.topicPartitionReplica()) - .noneMatch(predicate)) + .findAny() + .isEmpty()) .count(); var newLeaders = after .replicaStream(id) - .filter(predicate) .filter( r -> before .replicaStream(r.topicPartitionReplica()) - .noneMatch(predicate)) + .findAny() + .isEmpty()) .count(); return newLeaders - removedLeaders; })); diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index 9a68434d13..e05cdcdb36 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -16,7 +16,7 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; import java.util.List; import java.util.Map; @@ -80,11 +80,12 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { - var moveCost = replicaLeaderChanged(before, after); + var replicaLeaderIn = replicaLeaderToAdd(before, after); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); var overflow = - maxMigratedLeader < moveCost.values().stream().map(Math::abs).mapToLong(s -> s).sum(); + maxMigratedLeader + < replicaLeaderIn.values().stream().map(Math::abs).mapToLong(s -> s).sum(); return () -> overflow; } diff --git a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java index 1871450617..f30297ce7b 100644 --- a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java @@ -19,7 +19,8 @@ import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; import static org.astraea.common.cost.MigrationCost.recordSizeToFetch; import static org.astraea.common.cost.MigrationCost.recordSizeToSync; -import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToRemove; import static org.astraea.common.cost.MigrationCost.replicaNumChanged; import java.util.List; @@ -143,16 +144,34 @@ void testChangedReplicaLeaderNumber() { .isPreferredLeader(false) .path("") .build()); + /* + before: + topic1-0 : 0,1 + topic1-1 : 0,1 + after: + topic1-0 : 2,1 + topic1-1 : 0,2 + leader migrate out: + 0: 1 + 1: 0 + 2: 0 + leader migrate in: + 0: 0 + 1: 0 + 2: 1 + */ + var beforeClusterInfo = ClusterInfoTest.of(before); var afterClusterInfo = ClusterInfoTest.of(after); - var changedReplicaLeaderCount = replicaLeaderChanged(beforeClusterInfo, afterClusterInfo); - Assertions.assertEquals(3, changedReplicaLeaderCount.size()); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(0)); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(1)); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(2)); - Assertions.assertEquals(-1, changedReplicaLeaderCount.get(0)); - Assertions.assertEquals(0, changedReplicaLeaderCount.get(1)); - Assertions.assertEquals(1, changedReplicaLeaderCount.get(2)); + var changedReplicaLeaderInCount = replicaLeaderToAdd(beforeClusterInfo, afterClusterInfo); + var changedReplicaLeaderOutCount = replicaLeaderToRemove(beforeClusterInfo, afterClusterInfo); + Assertions.assertEquals(3, changedReplicaLeaderInCount.size()); + Assertions.assertEquals(1, changedReplicaLeaderInCount.get(0)); + Assertions.assertEquals(0, changedReplicaLeaderInCount.get(1)); + Assertions.assertEquals(0, changedReplicaLeaderInCount.get(2)); + Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(0)); + Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(1)); + Assertions.assertEquals(1, changedReplicaLeaderOutCount.get(2)); } @Test diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index b16629d59d..90f0a2db01 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -18,19 +18,68 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.astraea.common.Configuration; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; -import org.astraea.common.metrics.BeanObject; -import org.astraea.common.metrics.broker.ServerMetrics; +import org.astraea.common.metrics.ClusterBean; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; public class ReplicaLeaderCostTest { private final Dispersion dispersion = Dispersion.cov(); + @Test + void testLeaderCount() { + var baseCluster = + ClusterInfo.builder() + .addNode(Set.of(1, 2)) + .addFolders(Map.of(1, Set.of("/folder"))) + .addFolders(Map.of(2, Set.of("/folder"))) + .build(); + var sourceCluster = + ClusterInfo.builder(baseCluster) + .addTopic( + "topic1", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .addTopic( + "topic2", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .build(); + var overFlowTargetCluster = + ClusterInfo.builder(baseCluster) + .addTopic( + "topic1", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .addTopic( + "topic2", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .build(); + + var overFlowMoveCost = + new ReplicaLeaderCost( + Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "5"))) + .moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY); + + var noOverFlowMoveCost = + new ReplicaLeaderCost( + Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "10"))) + .moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY); + + Assertions.assertTrue(overFlowMoveCost.overflow()); + Assertions.assertFalse(noOverFlowMoveCost.overflow()); + } + @Test void testNoMetrics() { var replicas = @@ -73,13 +122,4 @@ void testNoMetrics() { Assertions.assertEquals(brokerCost.get(12), 0); Assertions.assertEquals(clusterCost, 0.816496580927726); } - - private ServerMetrics.ReplicaManager.Gauge mockResult(String name, int count) { - var result = Mockito.mock(ServerMetrics.ReplicaManager.Gauge.class); - var bean = Mockito.mock(BeanObject.class); - Mockito.when(result.beanObject()).thenReturn(bean); - Mockito.when(bean.properties()).thenReturn(Map.of("name", name, "type", "ReplicaManager")); - Mockito.when(result.value()).thenReturn(count); - return result; - } }