Skip to content

Commit

Permalink
HBASE-25894 Improve the performance for region load and region count …
Browse files Browse the repository at this point in the history
…related cost functions (#3276)

Signed-off-by: Yi Mei <[email protected]>
  • Loading branch information
Apache9 committed May 25, 2021
1 parent 8754e88 commit 2cb6cc8
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,47 @@
@InterfaceAudience.Private
abstract class CostFromRegionLoadFunction extends CostFunction {

private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
private double computeCostForRegionServer(int regionServerIndex) {
// Cost this server has from RegionLoad
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[regionServerIndex]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost += getRegionLoadCost(regionLoadList);
}
}
return cost;
}

@Override
protected final double cost() {
for (int i = 0; i < stats.length; i++) {
// Cost this server has from RegionLoad
double cost = 0;

// for every region on this server get the rl
for (int regionIndex : cluster.regionsPerServer[i]) {
Collection<BalancerRegionLoad> regionLoadList = cluster.regionLoads[regionIndex];

// Now if we found a region load get the type of cost that was requested.
if (regionLoadList != null) {
cost += getRegionLoadCost(regionLoadList);
}
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}

// Add the total cost to the stats.
stats[i] = cost;
}
@Override
protected void regionMoved(int region, int oldServer, int newServer) {
// recompute the stat for the given two region servers
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}

// Now return the scaled cost from data held in the stats object.
return costFromArray(stats);
@Override
protected final double cost() {
return cost.cost();
}

protected double getRegionLoadCost(Collection<BalancerRegionLoad> regionLoadList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,65 +81,14 @@ protected void regionMoved(int region, int oldServer, int newServer) {

protected abstract double cost();

@SuppressWarnings("checkstyle:linelength")
/**
* Function to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
* @param stats the costs
* @return a scaled set of costs.
*/
protected final double costFromArray(double[] stats) {
double totalCost = 0;
double total = getSum(stats);

double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = scale(min, max, totalCost);
return scaled;
}

private double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}

/**
* Scale the value between 0 and 1.
* @param min Min value
* @param max The Max value
* @param value The value to be scaled.
* @return The scaled value.
*/
protected final double scale(double min, double max, double value) {
protected static double scale(double min, double max, double value) {
if (max <= min || value <= min) {
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.function.Consumer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A helper class to compute a scaled cost using
* {@link org.apache.commons.math3.stat.descriptive.DescriptiveStatistics#DescriptiveStatistics()}.
* It assumes that this is a zero sum set of costs. It assumes that the worst case possible is all
* of the elements in one region server and the rest having 0.
*/
@InterfaceAudience.Private
final class DoubleArrayCost {

private double[] costs;

// computeCost call is expensive so we use this flag to indicate whether we need to recalculate
// the cost by calling computeCost
private boolean costsChanged;

private double cost;

void prepare(int length) {
if (costs == null || costs.length != length) {
costs = new double[length];
}
}

void setCosts(Consumer<double[]> consumer) {
consumer.accept(costs);
costsChanged = true;
}

double cost() {
if (costsChanged) {
cost = computeCost(costs);
costsChanged = false;
}
return cost;
}

private static double computeCost(double[] stats) {
double totalCost = 0;
double total = getSum(stats);

double count = stats.length;
double mean = total / count;

// Compute max as if all region servers had 0 and one had the sum of all costs. This must be
// a zero sum cost for this to make sense.
double max = ((count - 1) * mean) + (total - mean);

// It's possible that there aren't enough regions to go around
double min;
if (count > total) {
min = ((count - total) * mean) + ((1 - mean) * total);
} else {
// Some will have 1 more than everything else.
int numHigh = (int) (total - (Math.floor(mean) * count));
int numLow = (int) (count - numHigh);

min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean)));

}
min = Math.max(0, min);
for (int i = 0; i < stats.length; i++) {
double n = stats[i];
double diff = Math.abs(mean - n);
totalCost += diff;
}

double scaled = CostFunction.scale(min, max, totalCost);
return scaled;
}

private static double getSum(double[] stats) {
double total = 0;
for (double s : stats) {
total += s;
}
return total;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ class PrimaryRegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.primaryRegionCountCost";
private static final float DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST = 500;

private final float primaryRegionCountCost;
private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

PrimaryRegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as primary regions serve majority of reads/writes.
primaryRegionCountCost =
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST);
this.setMultiplier(primaryRegionCountCost);
this.setMultiplier(
conf.getFloat(PRIMARY_REGION_COUNT_SKEW_COST_KEY, DEFAULT_PRIMARY_REGION_COUNT_SKEW_COST));
}

private double computeCostForRegionServer(int regionServerIndex) {
int cost = 0;
for (int regionIdx : cluster.regionsPerServer[regionServerIndex]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
cost++;
}
}
return cost;
}

@Override
Expand All @@ -47,9 +55,20 @@ void prepare(BalancerClusterState cluster) {
if (!isNeeded()) {
return;
}
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < costs.length; i++) {
costs[i] = computeCostForRegionServer(i);
}
});
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = computeCostForRegionServer(oldServer);
costs[newServer] = computeCostForRegionServer(newServer);
});
}

@Override
Expand All @@ -59,15 +78,6 @@ boolean isNeeded() {

@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = 0;
for (int regionIdx : cluster.regionsPerServer[i]) {
if (regionIdx == cluster.regionIndexToPrimaryIndex[regionIdx]) {
stats[i]++;
}
}
}

return costFromArray(stats);
return cost.cost();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class RegionCountSkewCostFunction extends CostFunction {
"hbase.master.balancer.stochastic.regionCountCost";
static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;

private double[] stats;
private final DoubleArrayCost cost = new DoubleArrayCost();

RegionCountSkewCostFunction(Configuration conf) {
// Load multiplier should be the greatest as it is the most general way to balance data.
Expand All @@ -44,9 +44,12 @@ class RegionCountSkewCostFunction extends CostFunction {
@Override
void prepare(BalancerClusterState cluster) {
super.prepare(cluster);
if (stats == null || stats.length != cluster.numServers) {
stats = new double[cluster.numServers];
}
cost.prepare(cluster.numServers);
cost.setCosts(costs -> {
for (int i = 0; i < cluster.numServers; i++) {
costs[i] = cluster.regionsPerServer[i].length;
}
});
LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(),
cluster.numServers, cluster.numRegions);
if (LOG.isTraceEnabled()) {
Expand All @@ -59,9 +62,14 @@ void prepare(BalancerClusterState cluster) {

@Override
protected double cost() {
for (int i = 0; i < cluster.numServers; i++) {
stats[i] = cluster.regionsPerServer[i].length;
}
return costFromArray(stats);
return cost.cost();
}

@Override
protected void regionMoved(int region, int oldServer, int newServer) {
cost.setCosts(costs -> {
costs[oldServer] = cluster.regionsPerServer[oldServer].length;
costs[newServer] = cluster.regionsPerServer[newServer].length;
});
}
}
Loading

0 comments on commit 2cb6cc8

Please sign in to comment.