Skip to content

Commit

Permalink
HBASE-25757 Move BaseLoadBalancer to hbase-balancer module
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 24, 2021
1 parent 9895b2d commit e807a15
Show file tree
Hide file tree
Showing 43 changed files with 504 additions and 353 deletions.
5 changes: 5 additions & 0 deletions hbase-balancer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,11 @@
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ List<ServerName> removeFavoredNodes(RegionInfo region) {
}

/**
* @return the list of favored region server for this region based on the plan
* Returns the list of favored region server for this region based on the plan
*/
public List<ServerName> getFavoredNodes(RegionInfo region) {
return favoredNodesMap.get(region.getRegionNameAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Makes decisions about the placement and movement of Regions across
* RegionServers.
*
* <p>Cluster-wide load balancing will occur only when there are no regions in
* transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
*
* <p>On cluster startup, bulk assignment can be used to determine
* locations for all Regions in a cluster.
*
* <p>This class produces plans for the
* {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager}
* to execute.
* Makes decisions about the placement and movement of Regions across RegionServers.
* <p/>
* Cluster-wide load balancing will occur only when there are no regions in transition and according
* to a fixed period of a time using {@link #balanceCluster(Map)}.
* <p/>
* On cluster startup, bulk assignment can be used to determine locations for all Regions in a
* cluster.
* <p/>
* This class produces plans for the {@code AssignmentManager} to execute.
*/
@InterfaceAudience.Private
public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObserver {
Expand All @@ -69,15 +67,14 @@ public interface LoadBalancer extends Configurable, Stoppable, ConfigurationObse


/**
* Set the master service.
* Set the cluster info provider. Usually it is just a wrapper of master.
*/
void setMasterServices(MasterServices masterServices);
void setClusterInfoProvider(ClusterInfoProvider provider);

/**
* Perform the major balance operation for cluster, will invoke {@link #balanceTable} to do actual
* balance. Normally not need override this method, except
* {@link org.apache.hadoop.hbase.master.balancer.SimpleLoadBalancer} and
* {@link org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer}
* balance. Normally not need override this method, except {@link SimpleLoadBalancer} and
* {@code RSGroupBasedLoadBalancer}
* @param loadOfAllTable region load of servers for all table
* @return a list of regions to be moved, including source and destination, or null if cluster is
* already balanced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,14 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -55,10 +51,8 @@

/**
* The base class for load balancers. It provides the the functions used to by
* {@link org.apache.hadoop.hbase.master.assignment.AssignmentManager} to assign regions
* in the edge cases. It doesn't provide an implementation of the
* actual balancing algorithm.
*
* {@code AssignmentManager} to assign regions in the edge cases. It doesn't provide an
* implementation of the actual balancing algorithm.
*/
@InterfaceAudience.Private
public abstract class BaseLoadBalancer implements LoadBalancer {
Expand All @@ -72,9 +66,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected static final int MIN_SERVER_BALANCE = 2;
private volatile boolean stopped = false;

private static final Predicate<ServerMetrics> IDLE_SERVER_PREDICATOR
= load -> load.getRegionMetrics().isEmpty();

protected RegionHDFSBlockLocationFinder regionFinder;
protected boolean useRegionFinder;
protected boolean isByTable = false;
Expand All @@ -88,7 +79,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
protected MetricsBalancer metricsBalancer = null;
protected ClusterMetrics clusterStatus = null;
protected ServerName masterServerName;
protected MasterServices services;
protected ClusterInfoProvider provider;

/**
* The constructor that uses the basic MetricsBalancer
*/
Expand Down Expand Up @@ -151,24 +143,17 @@ public synchronized void setClusterMetrics(ClusterMetrics st) {


@Override
public void setMasterServices(MasterServices masterServices) {
masterServerName = masterServices.getServerName();
this.services = masterServices;
public void setClusterInfoProvider(ClusterInfoProvider provider) {
this.provider = provider;
if (useRegionFinder) {
this.regionFinder.setClusterInfoProvider(new MasterClusterInfoProvider(services));
this.regionFinder.setClusterInfoProvider(provider);
}
}

@Override
public void postMasterStartupInitialize() {
if (services != null && regionFinder != null) {
try {
Set<RegionInfo> regions =
services.getAssignmentManager().getRegionStates().getRegionAssignments().keySet();
regionFinder.refreshAndWait(regions);
} catch (Exception e) {
LOG.warn("Refreshing region HDFS Block dist failed with exception, ignoring", e);
}
if (provider != null && regionFinder != null) {
regionFinder.refreshAndWait(provider.getAssignedRegions());
}
}

Expand Down Expand Up @@ -277,22 +262,15 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
BalancerClusterState cluster = createCluster(servers, regions);
Map<ServerName, List<RegionInfo>> assignments = new HashMap<>();
roundRobinAssignment(cluster, regions, servers, assignments);
return assignments;
return Collections.unmodifiableMap(assignments);
}

private BalancerClusterState createCluster(List<ServerName> servers,
Collection<RegionInfo> regions) throws HBaseIOException {
boolean hasRegionReplica = false;
boolean hasRegionReplica= false;
try {
if (services != null && services.getTableDescriptors() != null) {
Map<String, TableDescriptor> tds = services.getTableDescriptors().getAll();
for (RegionInfo regionInfo : regions) {
TableDescriptor td = tds.get(regionInfo.getTable().getNameWithNamespaceInclAsString());
if (td != null && td.getRegionReplication() > 1) {
hasRegionReplica = true;
break;
}
}
if (provider != null) {
hasRegionReplica = provider.hasRegionReplica(regions);
}
} catch (IOException ioe) {
throw new HBaseIOException(ioe);
Expand Down Expand Up @@ -320,8 +298,8 @@ private BalancerClusterState createCluster(List<ServerName> servers,
}

private List<ServerName> findIdleServers(List<ServerName> servers) {
return this.services.getServerManager()
.getOnlineServersListWithPredicator(servers, IDLE_SERVER_PREDICATOR);
return provider.getOnlineServersListWithPredicator(servers,
metrics -> metrics.getRegionMetrics().isEmpty());
}

/**
Expand Down Expand Up @@ -474,7 +452,7 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server

LOG.info("Reassigned " + regions.size() + " regions. " + numRetainedAssigments
+ " retained the pre-restart assignment. " + randomAssignMsg);
return assignments;
return Collections.unmodifiableMap(assignments);
}

@Override
Expand Down Expand Up @@ -503,6 +481,7 @@ public void stop(String why) {
/**
* Updates the balancer status tag reported to JMX
*/
@Override
public void updateBalancerStatus(boolean status) {
metricsBalancer.balancerStatus(status);
}
Expand Down Expand Up @@ -607,13 +586,11 @@ private void roundRobinAssignment(BalancerClusterState cluster, List<RegionInfo>
}
}

private Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
Collection<RegionInfo> regions) {
if (this.services != null && this.services.getAssignmentManager() != null) {
return this.services.getAssignmentManager().getSnapShotOfAssignment(regions);
} else {
return new HashMap<>();
}
// return a modifiable map, as we may add more entries into the returned map.
private Map<ServerName, List<RegionInfo>>
getRegionAssignmentsByServer(Collection<RegionInfo> regions) {
return provider != null ? new HashMap<>(provider.getSnapShotOfAssignment(regions)) :
new HashMap<>();
}

private Map<ServerName, List<RegionInfo>> toEnsumbleTableLoad(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,12 @@ abstract class CandidateGenerator {
* From a list of regions pick a random one. Null can be returned which
* {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
* rather than swap.
*
* @param cluster The state of the cluster
* @param server index of the server
* @param chanceOfNoSwap Chance that this will decide to try a move rather
* than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is
* suggested.
* @param chanceOfNoSwap Chance that this will decide to try a move rather than a swap.
* @return a random {@link RegionInfo} or null if an asymmetrical move is suggested.
*/
int pickRandomRegion(BalancerClusterState cluster, int server,
double chanceOfNoSwap) {
int pickRandomRegion(BalancerClusterState cluster, int server, double chanceOfNoSwap) {
// Check to see if this is just a move.
if (cluster.regionsPerServer[server].length == 0
|| ThreadLocalRandom.current().nextFloat() < chanceOfNoSwap) {
Expand Down Expand Up @@ -95,8 +91,7 @@ int pickOtherRandomRack(BalancerClusterState cluster, int rackIndex) {
}
}

BalanceAction pickRandomRegions(BalancerClusterState cluster,
int thisServer, int otherServer) {
BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) {
if (thisServer < 0 || otherServer < 0) {
return BalanceAction.NULL_ACTION;
}
Expand All @@ -115,14 +110,12 @@ BalanceAction pickRandomRegions(BalancerClusterState cluster,
return getAction(thisServer, thisRegion, otherServer, otherRegion);
}

protected BalanceAction getAction(int fromServer, int fromRegion,
int toServer, int toRegion) {
protected BalanceAction getAction(int fromServer, int fromRegion, int toServer, int toRegion) {
if (fromServer < 0 || toServer < 0) {
return BalanceAction.NULL_ACTION;
}
if (fromRegion >= 0 && toRegion >= 0) {
return new SwapRegionsAction(fromServer, fromRegion,
toServer, toRegion);
return new SwapRegionsAction(fromServer, fromRegion, toServer, toRegion);
} else if (fromRegion >= 0) {
return new MoveRegionAction(fromRegion, fromServer, toServer);
} else if (toRegion >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.hadoop.hbase.master.balancer;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
Expand All @@ -45,11 +50,32 @@ public interface ClusterInfoProvider {
*/
TableDescriptor getTableDescriptor(TableName tableName) throws IOException;

/**
* Returns the number of tables on this cluster.
*/
int getNumberOfTables() throws IOException;

/**
* Compute the block distribution for the given region.
* <p/>
* Used to refresh region block locations on HDFS.
*/
HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
TableDescriptor tableDescriptor, RegionInfo regionInfo) throws IOException;

/**
* Check whether we have region replicas enabled for the tables of the given regions.
*/
boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException;

/**
* Returns a copy of the internal list of online servers matched by the given {@code filter}.
*/
List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
Predicate<ServerMetrics> filter);

/**
* Get a snapshot of the current assignment status.
*/
Map<ServerName, List<RegionInfo>> getSnapShotOfAssignment(Collection<RegionInfo> regions);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ BalanceAction generate(BalancerClusterState cluster) {
int currentServer = cluster.regionIndexToServerIndex[region];
if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities(
BalancerClusterState.LocalityType.SERVER)[region]) {
Optional<BalanceAction> potential = tryMoveOrSwap(cluster,
currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities(
Optional<BalanceAction> potential = tryMoveOrSwap(cluster, currentServer, region,
cluster.getOrComputeRegionsToMostLocalEntities(
BalancerClusterState.LocalityType.SERVER)[region]);
if (potential.isPresent()) {
return potential.get();
Expand All @@ -48,16 +48,16 @@ BalanceAction generate(BalancerClusterState cluster) {
return BalanceAction.NULL_ACTION;
}

private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster,
int fromServer, int fromRegion, int toServer) {
private Optional<BalanceAction> tryMoveOrSwap(BalancerClusterState cluster, int fromServer,
int fromRegion, int toServer) {
// Try move first. We know apriori fromRegion has the highest locality on toServer
if (cluster.serverHasTooFewRegions(toServer)) {
return Optional.of(getAction(fromServer, fromRegion, toServer, -1));
}
// Compare locality gain/loss from swapping fromRegion with regions on toServer
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer)
- getWeightedLocality(cluster, fromRegion, fromServer);
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
double fromRegionLocalityDelta = getWeightedLocality(cluster, fromRegion, toServer) -
getWeightedLocality(cluster, fromRegion, fromServer);
int toServertotalRegions = cluster.regionsPerServer[toServer].length;
if (toServertotalRegions > 0) {
int startIndex = ThreadLocalRandom.current().nextInt(toServertotalRegions);
for (int i = 0; i < toServertotalRegions; i++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.balancer;

import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
class RandomCandidateGenerator extends CandidateGenerator {

@Override
BalanceAction generate(BalancerClusterState cluster) {

int thisServer = pickRandomServer(cluster);

// Pick the other server
int otherServer = pickOtherRandomServer(cluster, thisServer);

return pickRandomRegions(cluster, thisServer, otherServer);
}
}
Loading

0 comments on commit e807a15

Please sign in to comment.