Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 committed May 25, 2023
1 parent ebbc82b commit 85ce98d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.astraea.common.balancer.BalancerConsole.TaskPhase.Searched;
import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_ADDED;
import static org.astraea.common.cost.MigrationCost.REPLICA_LEADERS_TO_REMOVE;
import static org.astraea.common.cost.MigrationCost.TO_FETCH_BYTES;
import static org.astraea.common.cost.MigrationCost.TO_FETCHED_BYTES;
import static org.astraea.common.cost.MigrationCost.TO_SYNC_BYTES;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -322,7 +322,7 @@ void testMoveCost(String leaderLimit, String sizeLimit) {
migrationCost -> {
switch (migrationCost.name) {
case TO_SYNC_BYTES:
case TO_FETCH_BYTES:
case TO_FETCHED_BYTES:
Assertions.assertTrue(
migrationCost.brokerCosts.values().stream().mapToLong(Long::intValue).sum()
<= DataSize.of(sizeLimit).bytes());
Expand Down
22 changes: 15 additions & 7 deletions common/src/main/java/org/astraea/common/cost/MigrationCost.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class MigrationCost {
public final String name;
public final Map<Integer, Long> brokerCosts;
public static final String TO_SYNC_BYTES = "record size to sync (bytes)";
public static final String TO_FETCH_BYTES = "record size to fetch (bytes)";
public static final String TO_FETCHED_BYTES = "record size to fetched (bytes)";
public static final String REPLICA_LEADERS_TO_ADDED = "leader number to add";
public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove";
public static final String CHANGED_REPLICAS = "changed replicas";
Expand All @@ -44,13 +44,13 @@ public class MigrationCost {
public static List<MigrationCost> migrationCosts(
ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var migrateInBytes = recordSizeToSync(before, after);
var migrateOutBytes = recordSizeToFetch(before, after);
var migrateOutBytes = recordSizeToFetched(before, after);
var migrateReplicaNum = replicaNumChanged(before, after);
var migrateInLeader = replicaLeaderToAdd(before, after);
var migrateOutLeader = replicaLeaderToRemove(before, after);
return List.of(
new MigrationCost(TO_SYNC_BYTES, migrateInBytes),
new MigrationCost(TO_FETCH_BYTES, migrateOutBytes),
new MigrationCost(TO_FETCHED_BYTES, migrateOutBytes),
new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum),
new MigrationCost(
PARTITION_MIGRATED_TIME, brokerMigrationSecond(before, after, clusterBean)),
Expand All @@ -64,7 +64,7 @@ public MigrationCost(String name, Map<Integer, Long> brokerCosts) {
this.brokerCosts = brokerCosts;
}

static Map<Integer, Long> recordSizeToFetch(ClusterInfo before, ClusterInfo after) {
static Map<Integer, Long> recordSizeToFetched(ClusterInfo before, ClusterInfo after) {
return migratedChanged(before, after, true, (ignore) -> true, Replica::size);
}

Expand Down Expand Up @@ -113,14 +113,14 @@ public static Map<Integer, Long> brokerMigrationSecond(
clusterBean,
PartitionMigrateTimeCost.MaxReplicationOutRateBean.class)));
var brokerMigrateInSecond =
MigrationCost.recordSizeToFetch(before, after).entrySet().stream()
MigrationCost.recordSizeToSync(before, after).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
brokerSize ->
brokerSize.getValue() / brokerInRate.get(brokerSize.getKey()).orElse(0)));
var brokerMigrateOutSecond =
MigrationCost.recordSizeToSync(before, after).entrySet().stream()
MigrationCost.recordSizeToFetched(before, after).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
Expand Down Expand Up @@ -170,7 +170,14 @@ private static Map<Integer, Long> migratedChanged(
p ->
dest.replicas(p).stream()
.filter(predicate)
.filter(r -> !source.replicas(p).contains(r)))
.filter(
r ->
source.replicas(p).stream()
.noneMatch(
sourceReplica ->
sourceReplica
.topicPartitionReplica()
.equals(r.topicPartitionReplica()))))
.map(
r -> {
if (migrateOut) return dest.replicaLeader(r.topicPartition()).orElse(r);
Expand All @@ -181,6 +188,7 @@ private static Map<Integer, Long> migratedChanged(
r -> r.nodeInfo().id(),
Collectors.mapping(
Function.identity(), Collectors.summingLong(replicaFunction::apply))));

return Stream.concat(dest.nodes().stream(), source.nodes().stream())
.map(NodeInfo::id)
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.astraea.common.cost;

import static org.astraea.common.cost.CostUtils.changedRecordSizeOverflow;
import static org.astraea.common.cost.MigrationCost.recordSizeToFetch;
import static org.astraea.common.cost.MigrationCost.recordSizeToFetched;
import static org.astraea.common.cost.MigrationCost.recordSizeToSync;

import java.util.List;
Expand All @@ -43,7 +43,7 @@ void testChangedRecordSizeOverflow() {
Assertions.assertEquals(1000, moveInResult.get(1));
Assertions.assertEquals(100 + 500, moveInResult.get(2));

var moveOutResult = recordSizeToFetch(beforeClusterInfo(), afterClusterInfo());
var moveOutResult = recordSizeToFetched(beforeClusterInfo(), afterClusterInfo());
Assertions.assertEquals(3, moveOutResult.size());
Assertions.assertEquals(100 + 500, moveOutResult.get(0));
Assertions.assertEquals(0, moveOutResult.get(1));
Expand Down

0 comments on commit 85ce98d

Please sign in to comment.