Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[COST] add ClusterBean to MigrationCost and revise Balancer#Plan #1788

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/src/main/java/org/astraea/app/web/BalancerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ private PlanExecutionProgress progress(String taskId) {
solution ->
new PlanReport(
changes.apply(solution),
MigrationCost.migrationCosts(contextCluster, solution.proposal())))
MigrationCost.migrationCosts(
contextCluster, solution.proposal(), solution.clusterBean())))
.orElse(null);
var phase = balancerConsole.taskPhase(taskId).orElseThrow();
return new PlanExecutionProgress(
Expand Down
34 changes: 9 additions & 25 deletions common/src/main/java/org/astraea/common/balancer/Balancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.astraea.common.balancer.algorithms.GreedyBalancer;
import org.astraea.common.balancer.algorithms.SingleStepBalancer;
import org.astraea.common.cost.ClusterCost;
import org.astraea.common.metrics.ClusterBean;

public interface Balancer {

Expand All @@ -32,41 +33,24 @@ public interface Balancer {
*/
Optional<Plan> offer(AlgorithmConfig config);

class Plan {
private final ClusterInfo initialClusterInfo;
private final ClusterCost initialClusterCost;

private final ClusterInfo proposal;
private final ClusterCost proposalClusterCost;

public Plan(
ClusterInfo initialClusterInfo,
ClusterCost initialClusterCost,
ClusterInfo proposal,
ClusterCost proposalClusterCost) {
this.initialClusterInfo = initialClusterInfo;
this.initialClusterCost = initialClusterCost;
this.proposal = proposal;
this.proposalClusterCost = proposalClusterCost;
}

public ClusterInfo initialClusterInfo() {
return initialClusterInfo;
}
record Plan(
qoo332001 marked this conversation as resolved.
Show resolved Hide resolved
ClusterBean clusterBean,
ClusterInfo initialClusterInfo,
ClusterCost initialClusterCost,
ClusterInfo proposal,
ClusterCost proposalClusterCost) {

/**
* The {@link ClusterCost} score of the original {@link ClusterInfo} when this plan is start
* generating.
*/
@Override
public ClusterCost initialClusterCost() {
return initialClusterCost;
}

public ClusterInfo proposal() {
return proposal;
}

/** The {@link ClusterCost} score of the proposed new allocation. */
@Override
public ClusterCost proposalClusterCost() {
return proposalClusterCost;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.map(
newAllocation ->
new Plan(
config.clusterBean(),
config.clusterInfo(),
initialCost,
newAllocation,
Expand Down Expand Up @@ -254,6 +255,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.overflow()) {
return Optional.of(
new Plan(
config.clusterBean(),
config.clusterInfo(),
config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean),
currentClusterInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.map(
newAllocation ->
new Plan(
config.clusterBean(),
config.clusterInfo(),
currentCost,
newAllocation,
Expand All @@ -155,6 +156,7 @@ public Optional<Plan> offer(AlgorithmConfig config) {
.overflow()) {
return Optional.of(
new Plan(
config.clusterBean(),
config.clusterInfo(),
config.clusterCostFunction().clusterCost(config.clusterInfo(), clusterBean),
currentClusterInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.astraea.common.admin.Broker;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.metrics.ClusterBean;

public class MigrationCost {

Expand All @@ -37,7 +38,8 @@ public class MigrationCost {
public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove";
public static final String CHANGED_REPLICAS = "changed replicas";

public static List<MigrationCost> migrationCosts(ClusterInfo before, ClusterInfo after) {
public static List<MigrationCost> migrationCosts(
ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) {
var migrateInBytes = recordSizeToSync(before, after);
var migrateOutBytes = recordSizeToFetch(before, after);
var migrateReplicaNum = replicaNumChanged(before, after);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,12 @@ public Optional<Plan> offer(AlgorithmConfig config) {
costFunction,
Duration.ofMillis(sampleTimeMs - (System.currentTimeMillis() - startMs)));
return Optional.of(
new Plan(config.clusterInfo(), () -> 0, config.clusterInfo(), () -> 0));
new Plan(
config.clusterBean(),
config.clusterInfo(),
() -> 0,
config.clusterInfo(),
() -> 0));
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.astraea.common.admin.Replica;
import org.astraea.common.balancer.Balancer;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.gui.Context;
import org.astraea.gui.pane.Argument;
import org.astraea.it.Service;
Expand Down Expand Up @@ -150,6 +151,7 @@ void testResult() {
var results =
BalancerNode.assignmentResult(
new Balancer.Plan(
ClusterBean.EMPTY,
beforeClusterInfo,
() -> 1.0D,
ClusterInfo.of("fake", allNodes, Map.of(), afterReplicas),
Expand Down