Skip to content

Commit

Permalink
review feedback apache#2
Browse files Browse the repository at this point in the history
  • Loading branch information
clarax committed Jun 8, 2021
1 parent 1832dbe commit 3bcf8a5
Showing 1 changed file with 29 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ private void loadCustomCostFunctions(Configuration conf) {
CostFunction func = createCostFunction(clazz, conf);
LOG.info("Successfully loaded custom CostFunction '{}'", func.getClass().getSimpleName());
costFunctions.add(func);
sumMultiplier += func.getMultiplier();
}
}

Expand Down Expand Up @@ -222,8 +221,7 @@ protected void loadConf(Configuration conf) {

regionReplicaHostCostFunction = new RegionReplicaHostCostFunction(conf);
regionReplicaRackCostFunction = new RegionReplicaRackCostFunction(conf);

sumMultiplier = 0.0f;

costFunctions = new ArrayList<>();
addCostFunction(new RegionCountSkewCostFunction(conf));
addCostFunction(new PrimaryRegionCountSkewCostFunction(conf));
Expand Down Expand Up @@ -291,8 +289,6 @@ private boolean areSomeRegionReplicasColocated(BalancerClusterState c) {
private String getBalanceReason(double total) {
if (total <= 0) {
return "(weighted sum of imbalance = " + total + " <= 0";
} else if (sumMultiplier <= 0) {
return "sumMultiplier = " + sumMultiplier + " <= 0";
} else if ((total / sumMultiplier) < minCostNeedBalance) {
return "(weighted average imbalance = " +
(total / sumMultiplier) + " < threshold (" + minCostNeedBalance + ")";
Expand Down Expand Up @@ -322,40 +318,34 @@ boolean needsBalance(TableName tableName, BalancerClusterState cluster) {
return true;
}

sumMultiplier = 0.0f;
double total = 0.0;
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
double cost = c.cost();
if (multiplier <= 0) {
LOG.trace("{} not needed because multiplier is <= 0", c.getClass().getSimpleName());
continue;
}
if (!c.isNeeded()) {
LOG.trace("{} not needed", c.getClass().getSimpleName());
continue;
}
if (cost < minCostNeedBalance)
{
LOG.debug("Imbalance of {} on the scale of [0, 1] is {} < threshold ({}).",
c.getClass().getSimpleName(), cost, minCostNeedBalance);
}
total += cost * multiplier;
sumMultiplier += multiplier;
}

boolean balanced = total <= 0 || sumMultiplier <= 0 ||
(sumMultiplier > 0 && (total / sumMultiplier) < minCostNeedBalance);
boolean balanced = (total / sumMultiplier < minCostNeedBalance);

if (balanced) {
final double calculatedTotal = total;
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal),
costFunctions);
sendRejectionReasonToRingBuffer(() -> getBalanceReason(calculatedTotal), costFunctions);
LOG.info("{} - skipping load balancing because weighted average imbalance={} > threshold({})."
+ "functionCost={}."
+ "If you want more aggressive balancing, either lower minCostNeedbalance {}"
+ "or increase the relative weight(s) of the specific cost function(s).",
isByTable ? "Table specific ("+tableName+")" : "Cluster wide", functionCost(),
total / sumMultiplier, minCostNeedBalance);
} else {
LOG.info("{} - Calculating plan. may take up to {}ms to complete.",
isByTable ? "Table specific ("+tableName+")" : "Cluster wide", maxRunningTime);
}
LOG.info("{} {} ", isByTable ? String.format("table (%s)", tableName) : "cluster",
balanced ? "Skipping load balancing because weighted average imbalance= "
+ total / sumMultiplier + "< threshold (" + minCostNeedBalance + ") on the scale of [0, 1]."
+ "If you want more aggressive balancing, either lower threshold minCostNeedbalance ("
+ minCostNeedBalance + ") or increase the relative weight(s) of the specific cost function(s)."
: "Calculating plan. May take up to " + maxRunningTime + " ms to complete.");

return !balanced;
}

Expand Down Expand Up @@ -388,8 +378,8 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
// Allow turning this feature off if the locality cost is not going to
// be used in any computations.
RegionHDFSBlockLocationFinder finder = null;
if ((this.localityCost != null && this.localityCost.getMultiplier() > 0)
|| (this.rackLocalityCost != null && this.rackLocalityCost.getMultiplier() > 0)) {
if ((this.localityCost != null)
|| (this.rackLocalityCost != null)) {
finder = this.regionFinder;
}

Expand Down Expand Up @@ -485,8 +475,8 @@ protected List<RegionPlan> balanceTable(TableName tableName, Map<ServerName,
return plans;
}
LOG.info("Could not find a better moving plan. Tried {} different configurations in " +
"{} ms, and did not find anything with a computed cost less than {}", step,
endTime - startTime, initCost);
"{} ms, and did not find anything with an imbalance score less than {}", step,
endTime - startTime, initCost / sumMultiplier);
LOG.info("Balancer is going into sleep until next period");
return null;
}
Expand All @@ -498,7 +488,7 @@ private void sendRejectionReasonToRingBuffer(Supplier<String> reason,
if (costFunctions != null) {
for (CostFunction c : costFunctions) {
float multiplier = c.getMultiplier();
if (multiplier <= 0 || !c.isNeeded()) {
if (!c.isNeeded()) {
continue;
}
builder.addCostFuncInfo(c.getClass().getName(), c.cost(), c.getMultiplier());
Expand Down Expand Up @@ -555,7 +545,6 @@ private void addCostFunction(CostFunction costFunction) {
float multiplier = costFunction.getMultiplier();
if (multiplier > 0) {
costFunctions.add(costFunction);
sumMultiplier += multiplier;
}
}

Expand All @@ -564,10 +553,14 @@ private String functionCost() {
for (CostFunction c : costFunctions) {
builder.append(c.getClass().getSimpleName());
builder.append(" : (");
if (c.isNeeded() || c.getMultiplier() > 0) {
if (c.isNeeded()) {
builder.append("multiplier=" + c.getMultiplier());
builder.append(", ");
builder.append("imbalance=" + c.cost());
double cost = c.cost();
builder.append("imbalance=" + cost);
if (cost < minCostNeedBalance) {
builder.append(", balanced");
}
} else {
builder.append("not needed");
}
Expand All @@ -579,7 +572,7 @@ private String functionCost() {
private String totalCostsPerFunc() {
StringBuilder builder = new StringBuilder();
for (CostFunction c : costFunctions) {
if (c.getMultiplier() <= 0 || !c.isNeeded()) {
if (!c.isNeeded()) {
continue;
}
double cost = c.getMultiplier() * c.cost();
Expand Down Expand Up @@ -663,7 +656,7 @@ void initCosts(BalancerClusterState cluster) {
allowedOnPath = ".*(/src/test/.*|StochasticLoadBalancer).java")
void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) {
for (CostFunction c : costFunctions) {
if (c.getMultiplier() > 0 && c.isNeeded()) {
if (c.isNeeded()) {
c.postAction(action);
}
}
Expand Down Expand Up @@ -702,7 +695,7 @@ String[] getCostFunctionNames() {
CostFunction c = costFunctions.get(i);
this.tempFunctionCosts[i] = 0.0;

if (c.getMultiplier() <= 0 || !c.isNeeded()) {
if (!c.isNeeded()) {
continue;
}

Expand Down

0 comments on commit 3bcf8a5

Please sign in to comment.