Skip to content

Commit

Permalink
[COST] add ClusterBean to MigrationCost and revise Balancer#Plan
Browse files Browse the repository at this point in the history
  • Loading branch information
qoo332001 authored May 29, 2023
1 parent b1c9566 commit c453129
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 29 deletions.
3 changes: 2 additions & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ private PlanExecutionProgress progress(String taskId) {
solution ->
new PlanReport(
changes.apply(solution),
MigrationCost.migrationCosts(contextCluster, solution.proposal())))
MigrationCost.migrationCosts(
contextCluster, solution.proposal(), solution.clusterBean())))
.orElse(null);
var phase = balancerConsole.taskPhase(taskId).orElseThrow();
return new PlanExecutionProgress(
Expand Down
34 changes: 9 additions & 25 deletions common/src/main/java/org/astraea/common/balancer/Balancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.astraea.common.balancer.algorithms.GreedyBalancer;
import org.astraea.common.balancer.algorithms.SingleStepBalancer;
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.metrics.ClusterBean;

public interface Balancer {

Expand All @@ -32,41 +33,24 @@ public interface Balancer {
*/
Optional<Plan> offer(AlgorithmConfig config);

class Plan {
private final ClusterInfo initialClusterInfo;
private final ClusterCost initialClusterCost;

private final ClusterInfo proposal;
private final ClusterCost proposalClusterCost;

public Plan(
ClusterInfo initialClusterInfo,
ClusterCost initialClusterCost,
ClusterInfo proposal,
ClusterCost proposalClusterCost) {
this.initialClusterInfo = initialClusterInfo;
this.initialClusterCost = initialClusterCost;
this.proposal = proposal;
this.proposalClusterCost = proposalClusterCost;
}

public ClusterInfo initialClusterInfo() {
return initialClusterInfo;
}
record Plan(
ClusterBean clusterBean,
ClusterInfo initialClusterInfo,
ClusterCost initialClusterCost,
ClusterInfo proposal,
ClusterCost proposalClusterCost) {

/**
* The {@link ClusterCost} score of the original {@link ClusterInfo} when this plan is start
* generating.
*/
@Override
public ClusterCost initialClusterCost() {
return initialClusterCost;
}

public ClusterInfo proposal() {
return proposal;
}

/** The {@link ClusterCost} score of the proposed new allocation. */
@Override
public ClusterCost proposalClusterCost() {
return proposalClusterCost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.map(
newAllocation ->
new Plan(
config.clusterBean(),
config.clusterInfo(),
initialCost,
newAllocation,
Expand Down Expand Up @@ -254,6 +255,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.overflow()) {
return Optional.of(
new Plan(
config.clusterBean(),
config.clusterInfo(),
config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean),
currentClusterInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.map(
newAllocation ->
new Plan(
config.clusterBean(),
config.clusterInfo(),
currentCost,
newAllocation,
Expand All @@ -155,6 +156,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.overflow()) {
return Optional.of(
new Plan(
config.clusterBean(),
config.clusterInfo(),
config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean),
currentClusterInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.metrics.ClusterBean;

public class MigrationCost {

Expand All @@ -37,7 +38,8 @@ public class MigrationCost {
public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove";
public static final String CHANGED_REPLICAS = "changed replicas";

public static List<MigrationCost> migrationCosts(ClusterInfo before, ClusterInfo after) {
public static List<MigrationCost> migrationCosts(
ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var migrateInBytes = recordSizeToSync(before, after);
var migrateOutBytes = recordSizeToFetch(before, after);
var migrateReplicaNum = replicaNumChanged(before, after);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,12 @@ public Optional<Plan> offer(AlgorithmConfig config) {
costFunction,
Duration.ofMillis(sampleTimeMs - (System.currentTimeMillis() - startMs)));
return Optional.of(
new Plan(config.clusterInfo(), () -> 0, config.clusterInfo(), () -> 0));
new Plan(
config.clusterBean(),
config.clusterInfo(),
() -> 0,
config.clusterInfo(),
() -> 0));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ static List<Map<String, Object>> costResult(Balancer.Plan plan) {
})
.put(migrationCost.name, count)));

process.accept(MigrationCost.migrationCosts(plan.initialClusterInfo(), plan.proposal()));
process.accept(
MigrationCost.migrationCosts(
plan.initialClusterInfo(), plan.proposal(), plan.clusterBean()));
return List.copyOf(map.values());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.astraea.common.admin.Replica;
import org.astraea.common.balancer.Balancer;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.gui.Context;
import org.astraea.gui.pane.Argument;
import org.astraea.it.Service;
Expand Down Expand Up @@ -150,6 +151,7 @@ void testResult() {
var results =
BalancerNode.assignmentResult(
new Balancer.Plan(
ClusterBean.EMPTY,
beforeClusterInfo,
() -> 1.0D,
ClusterInfo.of("fake", allNodes, Map.of(), afterReplicas),
Expand Down

0 comments on commit c453129

Please sign in to comment.