Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clarax committed Jun 8, 2021
1 parent 7480f09 commit 1832dbe
Showing 1 changed file with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {

private List<CandidateGenerator> candidateGenerators;
private List<CostFunction> costFunctions; // FindBugs: Wants this protected; IS2_INCONSISTENT_SYNC

// To save currently configed sum of multiplier
private float sumMultiplier = 0.0f;
// to save and report costs to JMX
private double curOverallCost = 0d;
private double[] tempFunctionCosts;
Expand Down Expand Up @@ -179,6 +180,7 @@ private void loadCustomCostFunctions(Configuration conf) {
CostFunction func = createCostFunction(clazz, conf);
LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
costFunctions.add(func);
sumMultiplier += func.getMultiplier();
}
}

Expand Down Expand Up @@ -221,6 +223,7 @@ protected void loadConf(Configuration conf) {
regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);

sumMultiplier = 0.0f;
costFunctions = new ArrayList<>();
addCostFunction(new RegionCountSkewCostFunction(conf));
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
Expand Down Expand Up @@ -285,14 +288,14 @@ private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
return false;
}

private String getBalanceReason(double total, double sumMultiplier) {
private String getBalanceReason(double total) {
if (total <= 0) {
return "(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern) = " + total + " <= 0";
return "(weighted sum of imbalance = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
return "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
return "[(cost1*multiplier1)+(cost2*multiplier2)+...+(costn*multipliern)]/sumMultiplier = " +
(total / sumMultiplier) + " <= minCostNeedBalance(" + minCostNeedBalance + ")";
return "(weighted average imbalance = " +
(total / sumMultiplier) + " < threshold (" + minCostNeedBalance + ")";
} else {
return "";
}
Expand Down Expand Up @@ -320,9 +323,9 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
}

double total = 0.0;
float sumMultiplier = 0.0f;
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
double cost = c.cost();
if (multiplier <= 0) {
LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
continue;
Expand All @@ -331,26 +334,27 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
LOG.trace("{} not needed", c.getClass().getSimpleName());
continue;
}
sumMultiplier += multiplier;
total += c.cost() * multiplier;
if (cost < minCostNeedBalance)
{
LOG.debug("Imbalance of {} on the scale of [0, 1] is {} < threshold ({}).",
c.getClass().getSimpleName(), cost, minCostNeedBalance);
}
total += cost * multiplier;
}

boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
if (balanced) {
final double calculatedTotal = total;
final double calculatedMultiplier = sumMultiplier;
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal, calculatedMultiplier),
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal),
costFunctions);
}
LOG.info("{} {}; total cost={}, sum multiplier={}; cost/multiplier to need a balance is {}",
balanced ? "Skipping load balancing because balanced" :
"Start calculating moving plan without logging info for up to {} secs",
isByTable ? String.format("table (%s)", tableName) : "cluster",
total, sumMultiplier, minCostNeedBalance, maxRunningTime / 1000);
if (LOG.isTraceEnabled()) {
LOG.trace("Balance decision detailed function costs={}", functionCost());
}
LOG.info("{} {} ", isByTable ? String.format("table (%s)", tableName) : "cluster",
balanced ? "Skipping load balancing because weighted average imbalance= "
+ total / sumMultiplier + "< threshold (" + minCostNeedBalance + ") on the scale of [0, 1]."
+ "If you want more aggressive balancing, either lower threshold minCostNeedbalance ("
+ minCostNeedBalance + ") or increase the relative weight(s) of the specific cost function(s)."
: "Calculating plan. May take up to " + maxRunningTime + " ms to complete.");

return !balanced;
}
Expand Down Expand Up @@ -423,8 +427,9 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
maxSteps);
}
}
LOG.info("start StochasticLoadBalancer.balancer, initCost=" + currentCost + ", functionCost="
+ functionCost() + " computedMaxSteps: " + computedMaxSteps);
LOG.info("Start StochasticLoadBalancer.balancer, initial weighted average imbalance ="
+ currentCost / sumMultiplier + ", functionCost=" + functionCost() + " computedMaxSteps: "
+ computedMaxSteps);

final String initFunctionTotalCosts = totalCostsPerFunc();
// Perform a stochastic walk to see if we can get a good fit.
Expand Down Expand Up @@ -470,17 +475,19 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
updateStochasticCosts(tableName, curOverallCost, curFunctionCosts);
if (initCost > currentCost) {
List<RegionPlan> plans = createRegionPlans(cluster);
LOG.info("Finished computing new load balance plan. Computation took {} ms" +
LOG.info("Finished computing new moving plan. Computation took {} ms" +
" to try {} different iterations. Found a solution that moves " +
"{} regions; Going from a computed cost of {}" +
" to a new cost of {}. Move plan to be submitted to master to execute",
endTime - startTime, step, plans.size(), initCost, currentCost);
"{} regions; Going from a computed imbalance of {}" +
" to a new imbalance of {}. ",
endTime - startTime, step, plans.size(), initCost / sumMultiplier, currentCost / sumMultiplier);
sendRegionPlansToRingBuffer(plans, currentCost, initCost, initFunctionTotalCosts, step);
LOG.info("Moving plan submitted. Balancer is going into sleep until next period");
return plans;
}
LOG.info("Could not find a better load balance plan. Tried {} different configurations in " +
LOG.info("Could not find a better moving plan. Tried {} different configurations in " +
"{} ms, and did not find anything with a computed cost less than {}", step,
endTime - startTime, initCost);
LOG.info("Balancer is going into sleep until next period");
return null;
}

Expand Down Expand Up @@ -545,8 +552,10 @@ private void updateStochasticCosts(TableName tableName, double overall, double[]
}

private void addCostFunction(CostFunction costFunction) {
if (costFunction.getMultiplier() > 0) {
float multiplier = costFunction.getMultiplier();
if (multiplier > 0) {
costFunctions.add(costFunction);
sumMultiplier += multiplier;
}
}

Expand All @@ -555,10 +564,10 @@ private String functionCost() {
for (CostFunction c : costFunctions) {
builder.append(c.getClass().getSimpleName());
builder.append(" : (");
if (c.isNeeded()) {
builder.append(c.getMultiplier());
if (c.isNeeded() || c.getMultiplier() > 0) {
builder.append("multiplier=" + c.getMultiplier());
builder.append(", ");
builder.append(c.cost());
builder.append("imbalance=" + c.cost());
} else {
builder.append("not needed");
}
Expand Down

0 comments on commit 1832dbe

Please sign in to comment.