Skip to content

Commit

Permalink
HBASE-25757 Move BaseLoadBalancer to hbase-balancer module
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 22, 2021
1 parent 50920ee commit c817487
Show file tree
Hide file tree
Showing 20 changed files with 302 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -69,9 +70,9 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse


/**
* Set the master service.
* Set the cluster info provider. Usually it is just a wrapper of master.
*/
void setMasterServices(MasterServices masterServices);
void setClusterInfoProvider(ClusterInfoProvider provider);

/**
* Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do actual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -63,6 +61,8 @@
*/
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {

private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);

public static final String BALANCER_DECISION_BUFFER_ENABLED =
"hbase.master.balancer.decision.buffer.enabled";
Expand All @@ -71,9 +71,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;

static final List<RegionInfo> EMPTY_REGION_LIST = Collections.emptyList();

static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
private static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
= load -> load.getRegionMetrics().isEmpty();

protected RegionHDFSBlockLocationFinder regionFinder;
Expand Down Expand Up @@ -110,11 +108,9 @@ private void createRegionFinder() {
protected float overallSlop;
protected Configuration config = HBaseConfiguration.create();
protected RackManager rackManager;
static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class);
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;
protected ClusterInfoProvider provider;

@Override
public void setConf(Configuration conf) {
Expand Down Expand Up @@ -161,24 +157,17 @@ public synchronized void setClusterMetrics(ClusterMetrics st) {


@Override
public void setMasterServices(MasterServices masterServices) {
masterServerName = masterServices.getServerName();
this.services = masterServices;
public void setClusterInfoProvider(ClusterInfoProvider provider) {
this.provider = provider;
if (useRegionFinder) {
this.regionFinder.setClusterInfoProvider(new MasterClusterInfoProvider(services));
this.regionFinder.setClusterInfoProvider(provider);
}
}

@Override
public void postMasterStartupInitialize() {
if (services != null && regionFinder != null) {
try {
Set<RegionInfo> regions =
services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
regionFinder.refreshAndWait(regions);
} catch (Exception e) {
LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
}
if (provider != null && regionFinder != null) {
regionFinder.refreshAndWait(provider.getAssignedRegions());
}
}

Expand Down Expand Up @@ -288,19 +277,12 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

protected BalancerClusterState createCluster(List<ServerName> servers,
private BalancerClusterState createCluster(List<ServerName> servers,
Collection<RegionInfo> regions) throws HBaseIOException {
boolean hasRegionReplica = false;
boolean hasRegionReplica= false;
try {
if (services != null && services.getTableDescriptors() != null) {
Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
for (RegionInfo regionInfo : regions) {
TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
if (td != null && td.getRegionReplication() > 1) {
hasRegionReplica = true;
break;
}
}
if (provider != null) {
hasRegionReplica = provider.hasRegionReplica(regions);
}
} catch (IOException ioe) {
throw new HBaseIOException(ioe);
Expand All @@ -320,16 +302,15 @@ protected BalancerClusterState createCluster(List<ServerName> servers,

for (ServerName server : servers) {
if (!clusterState.containsKey(server)) {
clusterState.put(server, EMPTY_REGION_LIST);
clusterState.put(server, Collections.emptyList());
}
}
return new BalancerClusterState(regions, clusterState, null, this.regionFinder,
rackManager);
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
return this.services.getServerManager()
.getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
return provider.getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
}

/**
Expand Down Expand Up @@ -615,13 +596,9 @@ private void roundRobinAssignment(BalancerClusterState cluster, List<RegionInfo>
}
}

protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
Collection<RegionInfo> regions) {
if (this.services != null && this.services.getAssignmentManager() != null) {
return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
} else {
return new HashMap<>();
}
private Map<ServerName, List<RegionInfo>>
getRegionAssignmentsByServer(Collection<RegionInfo> regions) {
return provider != null ? provider.getSnapShotOfAssignment(regions) : Collections.emptyMap();
}

private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.master.balancer;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -34,23 +35,19 @@ abstract class CandidateGenerator {
* From a list of regions pick a random one. Null can be returned which
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
* rather than swap.
*
* @param cluster The state of the cluster
* @param server index of the server
* @param chanceOfNoSwap Chance that this will decide to try a move rather
* than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is
* suggested.
* @param chanceOfNoSwap Chance that this will decide to try a move rather than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is suggested.
*/
int pickRandomRegion(BalancerClusterState cluster, int server,
double chanceOfNoSwap) {
int pickRandomRegion(BalancerClusterState cluster, int server, double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
|| StochasticLoadBalancer.RANDOM.nextFloat() < chanceOfNoSwap) {
if (cluster.regionsPerServer[server].length == 0 ||
ThreadLocalRandom.current().nextFloat() < chanceOfNoSwap) {
// signal a move only.
return -1;
}
int rand = StochasticLoadBalancer.RANDOM.nextInt(cluster.regionsPerServer[server].length);
int rand = ThreadLocalRandom.current().nextInt(cluster.regionsPerServer[server].length);
return cluster.regionsPerServer[server][rand];
}

Expand All @@ -59,15 +56,15 @@ int pickRandomServer(BalancerClusterState cluster) {
return -1;
}

return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers);
return ThreadLocalRandom.current().nextInt(cluster.numServers);
}

int pickRandomRack(BalancerClusterState cluster) {
if (cluster.numRacks < 1) {
return -1;
}

return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks);
return ThreadLocalRandom.current().nextInt(cluster.numRacks);
}

int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) {
Expand All @@ -94,8 +91,7 @@ int pickOtherRandomRack(BalancerClusterState cluster, int rackIndex) {
}
}

BalanceAction pickRandomRegions(BalancerClusterState cluster,
int thisServer, int otherServer) {
BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
if (thisServer < 0 || otherServer < 0) {
return BalanceAction.NULL_ACTION;
}
Expand All @@ -114,14 +110,12 @@ BalanceAction pickRandomRegions(BalancerClusterState cluster,
return getAction(thisServer, thisRegion, otherServer, otherRegion);
}

protected BalanceAction getAction(int fromServer, int fromRegion,
int toServer, int toRegion) {
protected BalanceAction getAction(int fromServer, int fromRegion, int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) {
return BalanceAction.NULL_ACTION;
}
if (fromRegion >= 0 && toRegion >= 0) {
return new SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion);
return new SwapRegionsAction(fromServer, fromRegion, toServer, toRegion);
} else if (fromRegion >= 0) {
return new MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.hadoop.hbase.master.balancer;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand All @@ -45,11 +50,32 @@ public interface ClusterInfoProvider {
*/
TableDescriptor getTableDescriptor(TableName tableName) throws IOException;

/**
* Returns the number of tables on this cluster.
*/
int getNumberOfTables() throws IOException;

/**
* Compute the block distribution for the given region.
* <p/>
* Used to refresh region block locations on HDFS.
*/
HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException;

/**
* Check whether we have region replicas enabled for the tables of the given regions.
*/
boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException;

/**
* Returns a copy of the internal list of online servers matched by the given {@code filter}.
*/
List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
Predicate<ServerMetrics> filter);

/**
* Get a snapshot of the current assignment status.
*/
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ BalanceAction generate(BalancerClusterState cluster) {
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BalancerClusterState.LocalityType.SERVER)[region]) {
Optional<BalanceAction> potential = tryMoveOrSwap(cluster,
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
Optional<BalanceAction> potential = tryMoveOrSwap(cluster, currentServer, region,
cluster.getOrComputeRegionsToMostLocalEntities(
BalancerClusterState.LocalityType.SERVER)[region]);
if (potential.isPresent()) {
return potential.get();
Expand All @@ -48,16 +48,16 @@ BalanceAction generate(BalancerClusterState cluster) {
return BalanceAction.NULL_ACTION;
}

private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster,
int fromServer, int fromRegion, int toServer) {
private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster, int fromServer,
int fromRegion, int toServer) {
// Try move first. We know apriori fromRegion has the highest locality on toServer
if (cluster.serverHasTooFewRegions(toServer)) {
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
}
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
- getWeightedLocality(cluster, fromRegion, fromServer);
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer) -
getWeightedLocality(cluster, fromRegion, fromServer);
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
if (toServertotalRegions > 0) {
int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
for (int i = 0; i < toServertotalRegions; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class RandomCandidateGenerator extends CandidateGenerator {

@Override
BalanceAction generate(BalancerClusterState cluster) {

int thisServer = pickRandomServer(cluster);

// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);

return pickRandomRegions(cluster, thisServer, otherServer);
}
}
Loading

0 comments on commit c817487

Please sign in to comment.