Skip to content

Commit

Permalink
HBASE-25802 Miscellaneous style improvements for load balancer relate…
Browse files Browse the repository at this point in the history
…d classes (#3192)

Signed-off-by: Yulin Niu <[email protected]>
  • Loading branch information
Apache9 authored Apr 23, 2021
1 parent 996862c commit 96fefce
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerMetrics;
Expand Down Expand Up @@ -64,28 +63,37 @@
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {

private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);

public static final String BALANCER_DECISION_BUFFER_ENABLED =
"hbase.master.balancer.decision.buffer.enabled";
public static final boolean DEFAULT_BALANCER_DECISION_BUFFER_ENABLED = false;

protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;

static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();

static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
private static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
= load -> load.getRegionMetrics().isEmpty();

protected RegionHDFSBlockLocationFinder regionFinder;
protected boolean useRegionFinder;
protected boolean isByTable = false;

// slop for regions
protected float slop;
// overallSlop to control simpleLoadBalancer's cluster level threshold
protected float overallSlop;
protected Configuration config;
protected RackManager rackManager;
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;
/**
* The constructor that uses the basic MetricsBalancer
*/
protected BaseLoadBalancer() {
metricsBalancer = new MetricsBalancer();
createRegionFinder();
this(null);
}

/**
Expand All @@ -94,28 +102,8 @@ protected BaseLoadBalancer() {
*/
protected BaseLoadBalancer(MetricsBalancer metricsBalancer) {
this.metricsBalancer = (metricsBalancer != null) ? metricsBalancer : new MetricsBalancer();
createRegionFinder();
}

private void createRegionFinder() {
useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
}
}

// slop for regions
protected float slop;
// overallSlop to control simpleLoadBalancer's cluster level threshold
protected float overallSlop;
protected Configuration config = HBaseConfiguration.create();
protected RackManager rackManager;
static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;

@Override
public void setConf(Configuration conf) {
this.config = conf;
Expand All @@ -133,7 +121,9 @@ public void setConf(Configuration conf) {
}

this.rackManager = new RackManager(getConf());
useRegionFinder = config.getBoolean("hbase.master.balancer.uselocality", true);
if (useRegionFinder) {
regionFinder = new RegionHDFSBlockLocationFinder();
regionFinder.setConf(conf);
}
this.isByTable = conf.getBoolean(HConstants.HBASE_MASTER_LOADBALANCE_BYTABLE, isByTable);
Expand Down Expand Up @@ -195,7 +185,9 @@ protected boolean needsBalance(TableName tableName, BalancerClusterState c) {
}
return false;
}
if(areSomeRegionReplicasColocated(c)) return true;
if (areSomeRegionReplicasColocated(c)) {
return true;
}
if(idleRegionServerExist(c)) {
return true;
}
Expand Down Expand Up @@ -248,10 +240,10 @@ protected final boolean idleRegionServerExist(BalancerClusterState c){
/**
* Generates a bulk assignment plan to be used on cluster startup using a
* simple round-robin assignment.
* <p>
* <p/>
* Takes a list of all the regions and all the servers in the cluster and
* returns a map of each server to the regions that it should be assigned.
* <p>
* <p/>
* Currently implemented as a round-robin assignment. Same invariant as load
* balancing, all servers holding floor(avg) or ceiling(avg).
*
Expand Down Expand Up @@ -288,7 +280,7 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

protected BalancerClusterState createCluster(List<ServerName> servers,
private BalancerClusterState createCluster(List<ServerName> servers,
Collection<RegionInfo> regions) throws HBaseIOException {
boolean hasRegionReplica = false;
try {
Expand Down Expand Up @@ -320,7 +312,7 @@ protected BalancerClusterState createCluster(List<ServerName> servers,

for (ServerName server : servers) {
if (!clusterState.containsKey(server)) {
clusterState.put(server, EMPTY_REGION_LIST);
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder,
Expand Down Expand Up @@ -615,7 +607,7 @@ private void roundRobinAssignment(BalancerClusterState cluster, List<RegionInfo>
}
}

protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
private Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
Collection<RegionInfo> regions) {
if (this.services != null && this.services.getAssignmentManager() != null) {
return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -46,11 +47,11 @@ int pickRandomRegion(BalancerClusterState cluster, int server,
double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
|| StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
|| ThreadLocalRandom.current().nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
int rand = ThreadLocalRandom.current().nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
}

Expand All @@ -59,15 +60,15 @@ int pickRandomServer(BalancerClusterState cluster) {
return -1;
}

return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
return ThreadLocalRandom.current().nextInt(cluster.numServers);
}

int pickRandomRack(BalancerClusterState cluster) {
if (cluster.numRacks < 1) {
return -1;
}

return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
return ThreadLocalRandom.current().nextInt(cluster.numRacks);
}

int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -275,7 +276,7 @@ private void assignRegionToAvailableFavoredNode(
}
} else {
// We don't have one/more load, lets just choose a random node
s = RANDOM.nextBoolean() ? secondaryHost : tertiaryHost;
s = ThreadLocalRandom.current().nextBoolean() ? secondaryHost : tertiaryHost;
}
addRegionToMap(assignmentMapForFavoredNodes, region, s);
} else if (secondaryHost != null) {
Expand Down Expand Up @@ -320,7 +321,7 @@ public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> serve

List<ServerName> onlineServers = getOnlineFavoredNodes(servers, favoredNodes);
if (onlineServers.size() > 0) {
destination = onlineServers.get(RANDOM.nextInt(onlineServers.size()));
destination = onlineServers.get(ThreadLocalRandom.current().nextInt(onlineServers.size()));
}

boolean alwaysAssign = getConf().getBoolean(FAVORED_ALWAYS_ASSIGN_REGIONS, true);
Expand Down Expand Up @@ -398,7 +399,8 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server
if (FavoredNodesPlan.getFavoredServerPosition(favoredNodes, sn) != null) {
addRegionToMap(assignmentMap, hri, sn);
} else {
ServerName destination = onlineFN.get(RANDOM.nextInt(onlineFN.size()));
ServerName destination =
onlineFN.get(ThreadLocalRandom.current().nextInt(onlineFN.size()));
LOG.warn("Region: " + hri + " not hosted on favored nodes: " + favoredNodes
+ " current: " + sn + " moving to: " + destination);
addRegionToMap(assignmentMap, hri, destination);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class RandomCandidateGenerator extends CandidateGenerator {

@Override
BalanceAction generate(BalancerClusterState cluster) {
int thisServer = pickRandomServer(cluster);

// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);

return pickRandomRegions(cluster, thisServer, otherServer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.master.balancer;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -27,8 +28,7 @@
@InterfaceAudience.Private
class RegionReplicaCandidateGenerator extends CandidateGenerator {

StochasticLoadBalancer.RandomCandidateGenerator randomGenerator =
new StochasticLoadBalancer.RandomCandidateGenerator();
protected final RandomCandidateGenerator randomGenerator = new RandomCandidateGenerator();

/**
* Randomly select one regionIndex out of all region replicas co-hosted in the same group
Expand Down Expand Up @@ -56,7 +56,7 @@ int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regions
int numReplicas = j - currentPrimaryIndex;
if (numReplicas > 1) { // means consecutive primaries, indicating co-location
// decide to select this primary region id or not
double currentRandom = StochasticLoadBalancer.RANDOM.nextDouble();
double currentRandom = ThreadLocalRandom.current().nextDouble();
// we don't know how many region replicas are co-hosted, we will randomly select one
// using reservoir sampling (http://gregable.com/2007/10/reservoir-sampling.html)
if (currentRandom > currentLargestRandom) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import java.util.concurrent.ThreadLocalRandom;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Generates candidates which moves the replicas out of the rack for co-hosted region replicas in
* the same rack
*/
@InterfaceAudience.Private
class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator {

@Override
BalanceAction generate(BalancerClusterState cluster) {
int rackIndex = pickRandomRack(cluster);
if (cluster.numRacks <= 1 || rackIndex == -1) {
return super.generate(cluster);
}

int regionIndex = selectCoHostedRegionPerGroup(cluster.primariesOfRegionsPerRack[rackIndex],
cluster.regionsPerRack[rackIndex], cluster.regionIndexToPrimaryIndex);

// if there are no pairs of region replicas co-hosted, default to random generator
if (regionIndex == -1) {
// default to randompicker
return randomGenerator.generate(cluster);
}

int serverIndex = cluster.regionIndexToServerIndex[regionIndex];
int toRackIndex = pickOtherRandomRack(cluster, rackIndex);

int rand = ThreadLocalRandom.current().nextInt(cluster.serversPerRack[toRackIndex].length);
int toServerIndex = cluster.serversPerRack[toRackIndex][rand];
int toRegionIndex = pickRandomRegion(cluster, toServerIndex, 0.9f);
return getAction(serverIndex, regionIndex, toServerIndex, toRegionIndex);
}
}
Loading

0 comments on commit 96fefce

Please sign in to comment.