Skip to content

Commit

Permalink
revise ReplicaLeaderCost (#1672)
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 authored Apr 30, 2023
1 parent 5711be3 commit f86391d
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 56 deletions.
19 changes: 6 additions & 13 deletions app/src/test/java/org/astraea/app/web/BalancerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import static org.astraea.common.balancer.BalancerConsole.TaskPhase.ExecutionFailed;
import static org.astraea.common.balancer.BalancerConsole.TaskPhase.SearchFailed;
import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched;
import static org.astraea.common.cost.MigrationCost.CHANGED_LEADERS;
import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_ADDED;
import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_REMOVE;
import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES;
import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES;

Expand Down Expand Up @@ -336,19 +337,11 @@ void testMoveCost(String leaderLimit, String sizeLimit) {
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= DataSize.of(sizeLimit).bytes());
break;
case CHANGED_LEADERS:
case REPLICA_LEADERS_TO_ADDED:
case REPLICA_LEADERS_TO_REMOVE:
Assertions.assertTrue(
Math.max(
migrationCost.brokerCosts.values().stream()
.filter(x -> x >= 0)
.mapToLong(Long::byteValue)
.sum(),
migrationCost.brokerCosts.values().stream()
.filter(x -> x < 0)
.map(Math::abs)
.mapToLong(Long::byteValue)
.sum())
<= Integer.parseInt(leaderLimit));
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= Long.parseLong(leaderLimit));
break;
}
});
Expand Down
54 changes: 35 additions & 19 deletions common/src/main/java/org/astraea/common/cost/MigrationCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ public class MigrationCost {
public final Map<Integer, Long> brokerCosts;
public static final String TO_SYNC_BYTES = "record size to sync (bytes)";
public static final String TO_FETCH_BYTES = "record size to fetch (bytes)";
public static final String REPLICA_LEADERS_TO_ADDED = "leader number to add";
public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove";
public static final String CHANGED_REPLICAS = "changed replicas";
public static final String CHANGED_LEADERS = "changed leaders";

public static List<MigrationCost> migrationCosts(ClusterInfo before, ClusterInfo after) {
var migrateInBytes = recordSizeToSync(before, after);
var migrateOutBytes = recordSizeToFetch(before, after);
var migrateReplicaNum = replicaNumChanged(before, after);
var migrateReplicaLeader = replicaLeaderChanged(before, after);
var migrateInLeader = replicaLeaderToAdd(before, after);
var migrateOutLeader = replicaLeaderToRemove(before, after);
return List.of(
new MigrationCost(TO_SYNC_BYTES, migrateInBytes),
new MigrationCost(TO_FETCH_BYTES, migrateOutBytes),
new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum),
new MigrationCost(CHANGED_LEADERS, migrateReplicaLeader));
new MigrationCost(REPLICA_LEADERS_TO_ADDED, migrateInLeader),
new MigrationCost(REPLICA_LEADERS_TO_REMOVE, migrateOutLeader),
new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum));
}

public MigrationCost(String name, Map<Integer, Long> brokerCosts) {
Expand All @@ -54,35 +57,49 @@ public MigrationCost(String name, Map<Integer, Long> brokerCosts) {
}

static Map<Integer, Long> recordSizeToFetch(ClusterInfo before, ClusterInfo after) {
return changedRecordSize(before, after, true);
return migratedChanged(before, after, true, (ignore) -> true, Replica::size);
}

static Map<Integer, Long> recordSizeToSync(ClusterInfo before, ClusterInfo after) {
return changedRecordSize(before, after, false);
return migratedChanged(before, after, false, (ignore) -> true, Replica::size);
}

static Map<Integer, Long> replicaNumChanged(ClusterInfo before, ClusterInfo after) {
return changedReplicaNumber(before, after, ignore -> true);
return changedReplicaNumber(before, after);
}

static Map<Integer, Long> replicaLeaderChanged(ClusterInfo before, ClusterInfo after) {
return changedReplicaNumber(before, after, Replica::isLeader);
static Map<Integer, Long> replicaLeaderToAdd(ClusterInfo before, ClusterInfo after) {
return migratedChanged(before, after, true, Replica::isLeader, ignore -> 1L);
}

static Map<Integer, Long> replicaLeaderToRemove(ClusterInfo before, ClusterInfo after) {
return migratedChanged(before, after, false, Replica::isLeader, ignore -> 1L);
}

/**
* @param before the ClusterInfo before migrated replicas
* @param after the ClusterInfo after migrated replicas
* @param migrateOut if data log need fetch from replica leader, set this true
* @param predicate used to filter replicas
* @param replicaFunction decide what information you want to calculate for the replica
* @return the data size to migrated by all brokers
*/
private static Map<Integer, Long> changedRecordSize(
ClusterInfo before, ClusterInfo after, boolean migrateOut) {
private static Map<Integer, Long> migratedChanged(
ClusterInfo before,
ClusterInfo after,
boolean migrateOut,
Predicate<Replica> predicate,
Function<Replica, Long> replicaFunction) {
var source = migrateOut ? after : before;
var dest = migrateOut ? before : after;
var changePartitions = ClusterInfo.findNonFulfilledAllocation(source, dest);
var cost =
changePartitions.stream()
.flatMap(p -> dest.replicas(p).stream().filter(r -> !source.replicas(p).contains(r)))
.flatMap(
p ->
dest.replicas(p).stream()
.filter(predicate)
.filter(r -> !source.replicas(p).contains(r)))
.map(
r -> {
if (migrateOut) return dest.replicaLeader(r.topicPartition()).orElse(r);
Expand All @@ -92,7 +109,7 @@ private static Map<Integer, Long> changedRecordSize(
Collectors.groupingBy(
r -> r.nodeInfo().id(),
Collectors.mapping(
Function.identity(), Collectors.summingLong(Replica::size))));
Function.identity(), Collectors.summingLong(replicaFunction::apply))));
return Stream.concat(dest.nodes().stream(), source.nodes().stream())
.map(NodeInfo::id)
.distinct()
Expand Down Expand Up @@ -133,8 +150,7 @@ static boolean changedRecordSizeOverflow(
return Math.max(totalRemovedSize, totalAddedSize) > limit;
}

private static Map<Integer, Long> changedReplicaNumber(
ClusterInfo before, ClusterInfo after, Predicate<Replica> predicate) {
private static Map<Integer, Long> changedReplicaNumber(ClusterInfo before, ClusterInfo after) {
return Stream.concat(before.nodes().stream(), after.nodes().stream())
.map(NodeInfo::id)
.distinct()
Expand All @@ -146,22 +162,22 @@ private static Map<Integer, Long> changedReplicaNumber(
var removedLeaders =
before
.replicaStream(id)
.filter(predicate)
.filter(
r ->
after
.replicaStream(r.topicPartitionReplica())
.noneMatch(predicate))
.findAny()
.isEmpty())
.count();
var newLeaders =
after
.replicaStream(id)
.filter(predicate)
.filter(
r ->
before
.replicaStream(r.topicPartitionReplica())
.noneMatch(predicate))
.findAny()
.isEmpty())
.count();
return newLeaders - removedLeaders;
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.astraea.common.cost;

import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged;
import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,11 +80,12 @@ public Configuration config() {

@Override
public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var moveCost = replicaLeaderChanged(before, after);
var replicaLeaderIn = replicaLeaderToAdd(before, after);
var maxMigratedLeader =
config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE);
var overflow =
maxMigratedLeader < moveCost.values().stream().map(Math::abs).mapToLong(s -> s).sum();
maxMigratedLeader
< replicaLeaderIn.values().stream().map(Math::abs).mapToLong(s -> s).sum();
return () -> overflow;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
import static org.astraea.common.cost.MigrationCost.changedRecordSizeOverflow;
import static org.astraea.common.cost.MigrationCost.recordSizeToFetch;
import static org.astraea.common.cost.MigrationCost.recordSizeToSync;
import static org.astraea.common.cost.MigrationCost.replicaLeaderChanged;
import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd;
import static org.astraea.common.cost.MigrationCost.replicaLeaderToRemove;
import static org.astraea.common.cost.MigrationCost.replicaNumChanged;

import java.util.List;
Expand Down Expand Up @@ -143,16 +144,34 @@ void testChangedReplicaLeaderNumber() {
.isPreferredLeader(false)
.path("")
.build());
/*
before:
topic1-0 : 0,1
topic1-1 : 0,1
after:
topic1-0 : 2,1
topic1-1 : 0,2
leader migrate out:
0: 1
1: 0
2: 0
leader migrate in:
0: 0
1: 0
2: 1
*/

var beforeClusterInfo = ClusterInfoTest.of(before);
var afterClusterInfo = ClusterInfoTest.of(after);
var changedReplicaLeaderCount = replicaLeaderChanged(beforeClusterInfo, afterClusterInfo);
Assertions.assertEquals(3, changedReplicaLeaderCount.size());
Assertions.assertTrue(changedReplicaLeaderCount.containsKey(0));
Assertions.assertTrue(changedReplicaLeaderCount.containsKey(1));
Assertions.assertTrue(changedReplicaLeaderCount.containsKey(2));
Assertions.assertEquals(-1, changedReplicaLeaderCount.get(0));
Assertions.assertEquals(0, changedReplicaLeaderCount.get(1));
Assertions.assertEquals(1, changedReplicaLeaderCount.get(2));
var changedReplicaLeaderInCount = replicaLeaderToAdd(beforeClusterInfo, afterClusterInfo);
var changedReplicaLeaderOutCount = replicaLeaderToRemove(beforeClusterInfo, afterClusterInfo);
Assertions.assertEquals(3, changedReplicaLeaderInCount.size());
Assertions.assertEquals(1, changedReplicaLeaderInCount.get(0));
Assertions.assertEquals(0, changedReplicaLeaderInCount.get(1));
Assertions.assertEquals(0, changedReplicaLeaderInCount.get(2));
Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(0));
Assertions.assertEquals(0, changedReplicaLeaderOutCount.get(1));
Assertions.assertEquals(1, changedReplicaLeaderOutCount.get(2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,68 @@

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.broker.ServerMetrics;
import org.astraea.common.metrics.ClusterBean;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class ReplicaLeaderCostTest {
private final Dispersion dispersion = Dispersion.cov();

@Test
void testLeaderCount() {
var baseCluster =
ClusterInfo.builder()
.addNode(Set.of(1, 2))
.addFolders(Map.of(1, Set.of("/folder")))
.addFolders(Map.of(2, Set.of("/folder")))
.build();
var sourceCluster =
ClusterInfo.builder(baseCluster)
.addTopic(
"topic1",
3,
(short) 1,
r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build())
.addTopic(
"topic2",
3,
(short) 1,
r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build())
.build();
var overFlowTargetCluster =
ClusterInfo.builder(baseCluster)
.addTopic(
"topic1",
3,
(short) 1,
r -> Replica.builder(r).nodeInfo(baseCluster.node(2)).build())
.addTopic(
"topic2",
3,
(short) 1,
r -> Replica.builder(r).nodeInfo(baseCluster.node(1)).build())
.build();

var overFlowMoveCost =
new ReplicaLeaderCost(
Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "5")))
.moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY);

var noOverFlowMoveCost =
new ReplicaLeaderCost(
Configuration.of(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "10")))
.moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY);

Assertions.assertTrue(overFlowMoveCost.overflow());
Assertions.assertFalse(noOverFlowMoveCost.overflow());
}

@Test
void testNoMetrics() {
var replicas =
Expand Down Expand Up @@ -73,13 +122,4 @@ void testNoMetrics() {
Assertions.assertEquals(brokerCost.get(12), 0);
Assertions.assertEquals(clusterCost, 0.816496580927726);
}

private ServerMetrics.ReplicaManager.Gauge mockResult(String name, int count) {
var result = Mockito.mock(ServerMetrics.ReplicaManager.Gauge.class);
var bean = Mockito.mock(BeanObject.class);
Mockito.when(result.beanObject()).thenReturn(bean);
Mockito.when(bean.properties()).thenReturn(Map.of("name", name, "type", "ReplicaManager"));
Mockito.when(result.value()).thenReturn(count);
return result;
}
}

0 comments on commit f86391d

Please sign in to comment.