From 25168ea74776dcaedd630d4f025716fa74615617 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 25 Apr 2023 01:56:25 +0800 Subject: [PATCH 1/5] revise ReplicaLeaderCost --- .../astraea/common/cost/MigrationCost.java | 54 +++++++++------ .../common/cost/ReplicaLeaderCost.java | 11 +++- .../common/cost/MigrationCostTest.java | 37 ++++++++--- .../common/cost/ReplicaLeaderCostTest.java | 65 +++++++++++++++---- 4 files changed, 124 insertions(+), 43 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/MigrationCost.java b/common/src/main/java/org/astraea/common/cost/MigrationCost.java index 764abf2d83..d27c9aa1f0 100644 --- a/common/src/main/java/org/astraea/common/cost/MigrationCost.java +++ b/common/src/main/java/org/astraea/common/cost/MigrationCost.java @@ -33,19 +33,22 @@ public class MigrationCost { public final Map brokerCosts; public static final String TO_SYNC_BYTES = "record size to sync (bytes)"; public static final String TO_FETCH_BYTES = "record size to fetch (bytes)"; + public static final String TO_SYNC_LEADERS = "leader number to sync"; + public static final String TO_FETCH_LEADERS = "leader number to fetch"; public static final String CHANGED_REPLICAS = "changed replicas"; - public static final String CHANGED_LEADERS = "changed leaders"; public static List migrationCosts(ClusterInfo before, ClusterInfo after) { var migrateInBytes = recordSizeToSync(before, after); var migrateOutBytes = recordSizeToFetch(before, after); var migrateReplicaNum = replicaNumChanged(before, after); - var migrateReplicaLeader = replicaLeaderChanged(before, after); + var migrateInLeader = replicaLeaderToFetch(before, after); + var migrateOutLeader = replicaLeaderToSync(before, after); return List.of( new MigrationCost(TO_SYNC_BYTES, migrateInBytes), new MigrationCost(TO_FETCH_BYTES, migrateOutBytes), - new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum), - new MigrationCost(CHANGED_LEADERS, migrateReplicaLeader)); + new MigrationCost(TO_SYNC_LEADERS, migrateInLeader), + new MigrationCost(TO_FETCH_LEADERS, migrateOutLeader), + new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum)); } public MigrationCost(String name, Map brokerCosts) { @@ -54,35 +57,49 @@ public MigrationCost(String name, Map brokerCosts) { } static Map recordSizeToFetch(ClusterInfo before, ClusterInfo after) { - return changedRecordSize(before, after, true); + return migratedChanged(before, after, true, (ignore) -> true, Replica::size); } static Map recordSizeToSync(ClusterInfo before, ClusterInfo after) { - return changedRecordSize(before, after, false); + return migratedChanged(before, after, false, (ignore) -> true, Replica::size); } static Map replicaNumChanged(ClusterInfo before, ClusterInfo after) { - return changedReplicaNumber(before, after, ignore -> true); + return changedReplicaNumber(before, after); } - static Map replicaLeaderChanged(ClusterInfo before, ClusterInfo after) { - return changedReplicaNumber(before, after, Replica::isLeader); + static Map replicaLeaderToFetch(ClusterInfo before, ClusterInfo after) { + return migratedChanged(before, after, true, Replica::isLeader, ignore -> 1L); + } + + static Map replicaLeaderToSync(ClusterInfo before, ClusterInfo after) { + return migratedChanged(before, after, false, Replica::isLeader, ignore -> 1L); } /** * @param before the ClusterInfo before migrated replicas * @param after the ClusterInfo after migrated replicas * @param migrateOut if data log need fetch from replica leader, set this true + * @param predicate used to filter replicas + * @param replicaFunction decide what information you want to calculate for the replica * @return the data size to migrated by all brokers */ - private static Map changedRecordSize( - ClusterInfo before, ClusterInfo after, boolean migrateOut) { + private static Map migratedChanged( + ClusterInfo before, + ClusterInfo after, + boolean migrateOut, + Predicate predicate, + Function replicaFunction) { var source = migrateOut ? after : before; var dest = migrateOut ? before : after; var changePartitions = ClusterInfo.findNonFulfilledAllocation(source, dest); var cost = changePartitions.stream() - .flatMap(p -> dest.replicas(p).stream().filter(r -> !source.replicas(p).contains(r))) + .flatMap( + p -> + dest.replicas(p).stream() + .filter(predicate) + .filter(r -> !source.replicas(p).contains(r))) .map( r -> { if (migrateOut) return dest.replicaLeader(r.topicPartition()).orElse(r); @@ -92,7 +109,7 @@ private static Map changedRecordSize( Collectors.groupingBy( r -> r.nodeInfo().id(), Collectors.mapping( - Function.identity(), Collectors.summingLong(Replica::size)))); + Function.identity(), Collectors.summingLong(replicaFunction::apply)))); return Stream.concat(dest.nodes().stream(), source.nodes().stream()) .map(NodeInfo::id) .distinct() @@ -133,8 +150,7 @@ static boolean changedRecordSizeOverflow( return Math.max(totalRemovedSize, totalAddedSize) > limit; } - private static Map changedReplicaNumber( - ClusterInfo before, ClusterInfo after, Predicate predicate) { + private static Map changedReplicaNumber(ClusterInfo before, ClusterInfo after) { return Stream.concat(before.nodes().stream(), after.nodes().stream()) .map(NodeInfo::id) .distinct() @@ -146,22 +162,22 @@ private static Map changedReplicaNumber( var removedLeaders = before .replicaStream(id) - .filter(predicate) .filter( r -> after .replicaStream(r.topicPartitionReplica()) - .noneMatch(predicate)) + .findAny() + .isEmpty()) .count(); var newLeaders = after .replicaStream(id) - .filter(predicate) .filter( r -> before .replicaStream(r.topicPartitionReplica()) - .noneMatch(predicate)) + .findAny() + .isEmpty()) .count(); return newLeaders - removedLeaders; })); diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index f755f73a97..f7c1fec82a 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -16,7 +16,8 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToFetch; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToSync; import java.util.List; import java.util.Map; @@ -80,11 +81,15 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { - var moveCost = replicaLeaderChanged(before, after); + var replicaLeaderToFetch = replicaLeaderToFetch(before, after); + var replicaLeaderToSync = replicaLeaderToSync(before, after); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); var overflow = - maxMigratedLeader < moveCost.values().stream().map(Math::abs).mapToLong(s -> s).sum(); + maxMigratedLeader + < Math.max( + replicaLeaderToFetch.values().stream().map(Math::abs).mapToLong(s -> s).sum(), + replicaLeaderToSync.values().stream().map(Math::abs).mapToLong(s -> s).sum()); return () -> overflow; } diff --git a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java index 1871450617..60cd70a787 100644 --- a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java @@ -19,7 +19,8 @@ import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; import static org.astraea.common.cost.MigrationCost.recordSizeToFetch; import static org.astraea.common.cost.MigrationCost.recordSizeToSync; -import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToFetch; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToSync; import static org.astraea.common.cost.MigrationCost.replicaNumChanged; import java.util.List; @@ -143,16 +144,34 @@ void testChangedReplicaLeaderNumber() { .isPreferredLeader(false) .path("") .build()); + /* + before: + topic1-0 : 0,1 + topic1-1 : 0,1 + after: + topic1-0 : 2,1 + topic1-1 : 0,2 + leader migrate out: + 0: 1 + 1: 0 + 2: 0 + leader migrate in: + 0: 0 + 1: 0 + 2: 1 + */ + var beforeClusterInfo = ClusterInfoTest.of(before); var afterClusterInfo = ClusterInfoTest.of(after); - var changedReplicaLeaderCount = replicaLeaderChanged(beforeClusterInfo, afterClusterInfo); - Assertions.assertEquals(3, changedReplicaLeaderCount.size()); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(0)); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(1)); - Assertions.assertTrue(changedReplicaLeaderCount.containsKey(2)); - Assertions.assertEquals(-1, changedReplicaLeaderCount.get(0)); - Assertions.assertEquals(0, changedReplicaLeaderCount.get(1)); - Assertions.assertEquals(1, changedReplicaLeaderCount.get(2)); + var changedReplicaLeaderInCount = replicaLeaderToFetch(beforeClusterInfo, afterClusterInfo); + var changedReplicaLeaderOutCount = replicaLeaderToSync(beforeClusterInfo, afterClusterInfo); + Assertions.assertEquals(3, changedReplicaLeaderInCount.size()); + Assertions.assertEquals(1, changedReplicaLeaderInCount.get(0)); + Assertions.assertEquals(0, changedReplicaLeaderInCount.get(1)); + Assertions.assertEquals(0, changedReplicaLeaderInCount.get(2)); + Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(0)); + Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(1)); + Assertions.assertEquals(1, changedReplicaLeaderOutCount.get(2)); } @Test diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index b16629d59d..8f5e1d6ad3 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -18,19 +18,69 @@ import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; -import org.astraea.common.metrics.BeanObject; -import org.astraea.common.metrics.broker.ServerMetrics; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; public class ReplicaLeaderCostTest { private final Dispersion dispersion = Dispersion.cov(); + @Test + void testLeaderCount() { + var baseCluster = + ClusterInfoBuilder.builder() + .addNode(Set.of(1, 2)) + .addFolders(Map.of(1, Set.of("/folder"))) + .addFolders(Map.of(2, Set.of("/folder"))) + .build(); + var sourceCluster = + ClusterInfoBuilder.builder(baseCluster) + .addTopic( + "topic1", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .addTopic( + "topic2", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .build(); + var overFlowTargetCluster = + ClusterInfoBuilder.builder(baseCluster) + .addTopic( + "topic1", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) + .addTopic( + "topic2", + 3, + (short) 1, + r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build()) + .build(); + + var overFlowMoveCost = + new ReplicaLeaderCost( + Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "5"))) + .moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY); + + var noOverFlowMoveCost = + new ReplicaLeaderCost( + Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "10"))) + .moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY); + + Assertions.assertTrue(overFlowMoveCost.overflow()); + Assertions.assertFalse(noOverFlowMoveCost.overflow()); + } + @Test void testNoMetrics() { var replicas = @@ -73,13 +123,4 @@ void testNoMetrics() { Assertions.assertEquals(brokerCost.get(12), 0); Assertions.assertEquals(clusterCost, 0.816496580927726); } - - private ServerMetrics.ReplicaManager.Gauge mockResult(String name, int count) { - var result = Mockito.mock(ServerMetrics.ReplicaManager.Gauge.class); - var bean = Mockito.mock(BeanObject.class); - Mockito.when(result.beanObject()).thenReturn(bean); - Mockito.when(bean.properties()).thenReturn(Map.of("name", name, "type", "ReplicaManager")); - Mockito.when(result.value()).thenReturn(count); - return result; - } } From 8153d9bad994445ac30def1ec8e1b49607c0aca4 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 25 Apr 2023 20:31:15 +0800 Subject: [PATCH 2/5] fix bug --- .../test/java/org/astraea/app/web/BalancerHandlerTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index 0aed00deaa..e88c9f2a72 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -21,9 +21,10 @@ import static org.astraea.common.balancer.BalancerConsole.TaskPhase.ExecutionFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.SearchFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched; -import static org.astraea.common.cost.MigrationCost.CHANGED_LEADERS; import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES; +import static org.astraea.common.cost.MigrationCost.TO_FETCH_LEADERS; import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES; +import static org.astraea.common.cost.MigrationCost.TO_SYNC_LEADERS; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -332,7 +333,8 @@ void testMoveCost(String leaderLimit, String sizeLimit) { migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum() <= DataSize.of(sizeLimit).bytes()); break; - case CHANGED_LEADERS: + case TO_FETCH_LEADERS: + case TO_SYNC_LEADERS: Assertions.assertTrue( Math.max( migrationCost.brokerCosts.values().stream() From 250272325eff59f9368dd3e813b8de6740f16d16 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Thu, 27 Apr 2023 14:38:39 +0800 Subject: [PATCH 3/5] fix issues --- .../org/astraea/app/web/BalancerHandlerTest.java | 4 ++-- .../org/astraea/common/cost/MigrationCost.java | 16 ++++++++-------- .../astraea/common/cost/ReplicaLeaderCost.java | 10 +++------- .../astraea/common/cost/MigrationCostTest.java | 8 ++++---- 4 files changed, 17 insertions(+), 21 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index e88c9f2a72..d588ce2ae4 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -21,10 +21,10 @@ import static org.astraea.common.balancer.BalancerConsole.TaskPhase.ExecutionFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.SearchFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched; +import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_REMOVE; import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES; import static org.astraea.common.cost.MigrationCost.TO_FETCH_LEADERS; import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES; -import static org.astraea.common.cost.MigrationCost.TO_SYNC_LEADERS; import java.nio.charset.StandardCharsets; import java.time.Duration; @@ -334,7 +334,7 @@ void testMoveCost(String leaderLimit, String sizeLimit) { <= DataSize.of(sizeLimit).bytes()); break; case TO_FETCH_LEADERS: - case TO_SYNC_LEADERS: + case REPLICA_LEADERS_TO_REMOVE: Assertions.assertTrue( Math.max( migrationCost.brokerCosts.values().stream() diff --git a/common/src/main/java/org/astraea/common/cost/MigrationCost.java b/common/src/main/java/org/astraea/common/cost/MigrationCost.java index d27c9aa1f0..b3cdafddec 100644 --- a/common/src/main/java/org/astraea/common/cost/MigrationCost.java +++ b/common/src/main/java/org/astraea/common/cost/MigrationCost.java @@ -33,21 +33,21 @@ public class MigrationCost { public final Map brokerCosts; public static final String TO_SYNC_BYTES = "record size to sync (bytes)"; public static final String TO_FETCH_BYTES = "record size to fetch (bytes)"; - public static final String TO_SYNC_LEADERS = "leader number to sync"; - public static final String TO_FETCH_LEADERS = "leader number to fetch"; + public static final String REPLICA_LEADERS_TO_ADDED = "leader number to add"; + public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove"; public static final String CHANGED_REPLICAS = "changed replicas"; public static List migrationCosts(ClusterInfo before, ClusterInfo after) { var migrateInBytes = recordSizeToSync(before, after); var migrateOutBytes = recordSizeToFetch(before, after); var migrateReplicaNum = replicaNumChanged(before, after); - var migrateInLeader = replicaLeaderToFetch(before, after); - var migrateOutLeader = replicaLeaderToSync(before, after); + var migrateInLeader = replicaLeaderToAdd(before, after); + var migrateOutLeader = replicaLeaderToRemove(before, after); return List.of( new MigrationCost(TO_SYNC_BYTES, migrateInBytes), new MigrationCost(TO_FETCH_BYTES, migrateOutBytes), - new MigrationCost(TO_SYNC_LEADERS, migrateInLeader), - new MigrationCost(TO_FETCH_LEADERS, migrateOutLeader), + new MigrationCost(REPLICA_LEADERS_TO_ADDED, migrateInLeader), + new MigrationCost(REPLICA_LEADERS_TO_REMOVE, migrateOutLeader), new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum)); } @@ -68,11 +68,11 @@ static Map replicaNumChanged(ClusterInfo before, ClusterInfo afte return changedReplicaNumber(before, after); } - static Map replicaLeaderToFetch(ClusterInfo before, ClusterInfo after) { + static Map replicaLeaderToAdd(ClusterInfo before, ClusterInfo after) { return migratedChanged(before, after, true, Replica::isLeader, ignore -> 1L); } - static Map replicaLeaderToSync(ClusterInfo before, ClusterInfo after) { + static Map replicaLeaderToRemove(ClusterInfo before, ClusterInfo after) { return migratedChanged(before, after, false, Replica::isLeader, ignore -> 1L); } diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index f7c1fec82a..cc93cce67f 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -16,8 +16,7 @@ */ package org.astraea.common.cost; -import static org.astraea.common.cost.MigrationCost.replicaLeaderToFetch; -import static org.astraea.common.cost.MigrationCost.replicaLeaderToSync; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; import java.util.List; import java.util.Map; @@ -81,15 +80,12 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { - var replicaLeaderToFetch = replicaLeaderToFetch(before, after); - var replicaLeaderToSync = replicaLeaderToSync(before, after); + var replicaLeaderIn = replicaLeaderToAdd(before, after); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); var overflow = maxMigratedLeader - < Math.max( - replicaLeaderToFetch.values().stream().map(Math::abs).mapToLong(s -> s).sum(), - replicaLeaderToSync.values().stream().map(Math::abs).mapToLong(s -> s).sum()); + < replicaLeaderIn.values().stream().map(Math::abs).mapToLong(s -> s).sum(); return () -> overflow; } diff --git a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java index 60cd70a787..f30297ce7b 100644 --- a/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/MigrationCostTest.java @@ -19,8 +19,8 @@ import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow; import static org.astraea.common.cost.MigrationCost.recordSizeToFetch; import static org.astraea.common.cost.MigrationCost.recordSizeToSync; -import static org.astraea.common.cost.MigrationCost.replicaLeaderToFetch; -import static org.astraea.common.cost.MigrationCost.replicaLeaderToSync; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToRemove; import static org.astraea.common.cost.MigrationCost.replicaNumChanged; import java.util.List; @@ -163,8 +163,8 @@ void testChangedReplicaLeaderNumber() { var beforeClusterInfo = ClusterInfoTest.of(before); var afterClusterInfo = ClusterInfoTest.of(after); - var changedReplicaLeaderInCount = replicaLeaderToFetch(beforeClusterInfo, afterClusterInfo); - var changedReplicaLeaderOutCount = replicaLeaderToSync(beforeClusterInfo, afterClusterInfo); + var changedReplicaLeaderInCount = replicaLeaderToAdd(beforeClusterInfo, afterClusterInfo); + var changedReplicaLeaderOutCount = replicaLeaderToRemove(beforeClusterInfo, afterClusterInfo); Assertions.assertEquals(3, changedReplicaLeaderInCount.size()); Assertions.assertEquals(1, changedReplicaLeaderInCount.get(0)); Assertions.assertEquals(0, changedReplicaLeaderInCount.get(1)); From 6f2cf3cef1c18745560959b7a49d4cb6bb98a7d8 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sat, 29 Apr 2023 16:50:57 +0800 Subject: [PATCH 4/5] fix bugs --- .../astraea/app/web/BalancerHandlerTest.java | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java index d588ce2ae4..d8ecb0c792 100644 --- a/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java @@ -21,9 +21,9 @@ import static org.astraea.common.balancer.BalancerConsole.TaskPhase.ExecutionFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.SearchFailed; import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched; +import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_ADDED; import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_REMOVE; import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES; -import static org.astraea.common.cost.MigrationCost.TO_FETCH_LEADERS; import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES; import java.nio.charset.StandardCharsets; @@ -333,20 +333,11 @@ void testMoveCost(String leaderLimit, String sizeLimit) { migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum() <= DataSize.of(sizeLimit).bytes()); break; - case TO_FETCH_LEADERS: + case REPLICA_LEADERS_TO_ADDED: case REPLICA_LEADERS_TO_REMOVE: Assertions.assertTrue( - Math.max( - migrationCost.brokerCosts.values().stream() - .filter(x -> x >= 0) - .mapToLong(Long::byteValue) - .sum(), - migrationCost.brokerCosts.values().stream() - .filter(x -> x < 0) - .map(Math::abs) - .mapToLong(Long::byteValue) - .sum()) - <= Integer.parseInt(leaderLimit)); + migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum() + <= Long.parseLong(leaderLimit)); break; } }); From aedf80127ae7fc6c5fbcbb231eb48bef2739f911 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Sat, 29 Apr 2023 18:01:53 +0800 Subject: [PATCH 5/5] fix conflict --- .../org/astraea/common/cost/ReplicaLeaderCostTest.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index 8f5e1d6ad3..90f0a2db01 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -21,11 +21,10 @@ import java.util.Set; import java.util.stream.Collectors; import org.astraea.common.Configuration; -import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.ClusterInfoBuilder; import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; +import org.astraea.common.metrics.ClusterBean; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -35,13 +34,13 @@ public class ReplicaLeaderCostTest { @Test void testLeaderCount() { var baseCluster = - ClusterInfoBuilder.builder() + ClusterInfo.builder() .addNode(Set.of(1, 2)) .addFolders(Map.of(1, Set.of("/folder"))) .addFolders(Map.of(2, Set.of("/folder"))) .build(); var sourceCluster = - ClusterInfoBuilder.builder(baseCluster) + ClusterInfo.builder(baseCluster) .addTopic( "topic1", 3, @@ -54,7 +53,7 @@ void testLeaderCount() { r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build()) .build(); var overFlowTargetCluster = - ClusterInfoBuilder.builder(baseCluster) + ClusterInfo.builder(baseCluster) .addTopic( "topic1", 3,