Skip to content

Commit

Permalink
HBASE-25916 Move FavoredNodeLoadBalancer to hbase-balancer module (#3327
Browse files Browse the repository at this point in the history
)

Signed-off-by: Yulin Niu <[email protected]>
  • Loading branch information
Apache9 authored May 31, 2021
1 parent 1ccba10 commit 06c6e06
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -814,7 +817,7 @@ private Map<RegionInfo, List<ServerName>> generateFavoredNodes(
return generatedFavNodes;
}

/*
/**
* Get the rack of server from local mapping when present, saves lookup by the RackManager.
*/
private String getRackOfServer(ServerName sn) {
Expand All @@ -826,4 +829,13 @@ private String getRackOfServer(ServerName sn) {
return rack;
}
}

public static int getDataNodePort(Configuration conf) {
HdfsConfiguration.init();
Configuration dnConf = new HdfsConfiguration(conf);
int dnPort = NetUtils.createSocketAddr(dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
LOG.debug("Loaded default datanode port for FN: {}", dnPort);
return dnPort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -67,17 +65,11 @@
public class FavoredNodeLoadBalancer extends BaseLoadBalancer implements FavoredNodesPromoter {
private static final Logger LOG = LoggerFactory.getLogger(FavoredNodeLoadBalancer.class);

private MasterServices services;
private FavoredNodesManager fnm;

public void setMasterServices(MasterServices services) {
this.services = services;
}

@Override
public void initialize() {
super.initialize();
this.fnm = services.getFavoredNodesManager();
public void setFavoredNodesManager(FavoredNodesManager fnm) {
this.fnm = fnm;
}

@Override
Expand All @@ -86,8 +78,7 @@ protected List<RegionPlan> balanceTable(TableName tableName,
// TODO. Look at is whether Stochastic loadbalancer can be integrated with this
List<RegionPlan> plans = new ArrayList<>();
Map<ServerName, ServerName> serverNameWithoutCodeToServerName = new HashMap<>();
ServerManager serverMgr = services.getServerManager();
for (ServerName sn : serverMgr.getOnlineServersList()) {
for (ServerName sn : provider.getOnlineServersList()) {
ServerName s = ServerName.valueOf(sn.getHostname(), sn.getPort(), ServerName.NON_STARTCODE);
// FindBugs complains about useless store! serverNameToServerNameWithoutCode.put(sn, s);
serverNameWithoutCodeToServerName.put(s, sn);
Expand All @@ -106,9 +97,8 @@ protected List<RegionPlan> balanceTable(TableName tableName,
if (favoredNodes == null || favoredNodes.get(0).equals(currentServerWithoutStartCode)) {
continue; // either favorednodes does not exist or we are already on the primary node
}
ServerName destination = null;
// check whether the primary is available
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
ServerName destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(0));
if (destination == null) {
// check whether the region is on secondary/tertiary
if (currentServerWithoutStartCode.equals(favoredNodes.get(1))
Expand All @@ -117,10 +107,10 @@ protected List<RegionPlan> balanceTable(TableName tableName,
}
// the region is currently on none of the favored nodes
// get it on one of them if possible
ServerMetrics l1 = services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
ServerMetrics l2 = services.getServerManager()
.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
ServerMetrics l1 =
provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(1)));
ServerMetrics l2 =
provider.getLoad(serverNameWithoutCodeToServerName.get(favoredNodes.get(2)));
if (l1 != null && l2 != null) {
if (l1.getRegionMetrics().size() > l2.getRegionMetrics().size()) {
destination = serverNameWithoutCodeToServerName.get(favoredNodes.get(2));
Expand Down Expand Up @@ -176,8 +166,7 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
Map<ServerName,List<RegionInfo>> regionsWithFavoredNodesMap = segregatedRegions.getFirst();
List<RegionInfo> regionsWithNoFavoredNodes = segregatedRegions.getSecond();
assignmentMap = new HashMap<>();
roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes,
servers);
roundRobinAssignmentImpl(assignmentHelper, assignmentMap, regionsWithNoFavoredNodes);
// merge the assignment maps
assignmentMap.putAll(regionsWithFavoredNodesMap);
} catch (Exception ex) {
Expand Down Expand Up @@ -283,8 +272,8 @@ private void assignRegionToAvailableFavoredNode(Map<ServerName,
// assign the region to the one with a lower load
// (both have the desired hdfs blocks)
ServerName s;
ServerMetrics tertiaryLoad = services.getServerManager().getLoad(tertiaryHost);
ServerMetrics secondaryLoad = services.getServerManager().getLoad(secondaryHost);
ServerMetrics tertiaryLoad = provider.getLoad(tertiaryHost);
ServerMetrics secondaryLoad = provider.getLoad(secondaryHost);
if (secondaryLoad.getRegionMetrics().size() < tertiaryLoad.getRegionMetrics().size()) {
s = secondaryHost;
} else {
Expand Down Expand Up @@ -314,8 +303,7 @@ public List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
}

private void roundRobinAssignmentImpl(FavoredNodeAssignmentHelper assignmentHelper,
Map<ServerName, List<RegionInfo>> assignmentMap,
List<RegionInfo> regions, List<ServerName> servers) throws IOException {
Map<ServerName, List<RegionInfo>> assignmentMap, List<RegionInfo> regions) throws IOException {
Map<RegionInfo, ServerName> primaryRSMap = new HashMap<>();
// figure the primary RSs
assignmentHelper.placePrimaryRSAsRoundRobin(assignmentMap, primaryRSMap, regions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import static org.apache.hadoop.hbase.ServerName.NON_STARTCODE;
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.FAVORED_NODES_NUM;
import static org.apache.hadoop.hbase.favored.FavoredNodeAssignmentHelper.getDataNodePort;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.PRIMARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.SECONDARY;
import static org.apache.hadoop.hbase.favored.FavoredNodesPlan.Position.TERTIARY;

import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -32,17 +32,11 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.SnapshotOfRegionAssignmentFromMeta;
import org.apache.hadoop.hbase.master.balancer.ClusterInfoProvider;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.net.NetUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
Expand All @@ -61,8 +55,6 @@
@InterfaceAudience.Private
public class FavoredNodesManager {

private static final Logger LOG = LoggerFactory.getLogger(FavoredNodesManager.class);

private final FavoredNodesPlan globalFavoredNodesAssignmentPlan;
private final Map<ServerName, List<RegionInfo>> primaryRSToRegionMap;
private final Map<ServerName, List<RegionInfo>> secondaryRSToRegionMap;
Expand All @@ -83,30 +75,21 @@ public FavoredNodesManager(ClusterInfoProvider provider) {
this.teritiaryRSToRegionMap = new HashMap<>();
}

public synchronized void initialize(SnapshotOfRegionAssignmentFromMeta snapshot) {
public void initializeFromMeta() throws IOException {
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(provider.getConnection());
snapshot.initialize();
// Add snapshot to structures made on creation. Current structures may have picked
// up data between construction and the scan of meta needed before this method
// is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
this.globalFavoredNodesAssignmentPlan.
updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
datanodeDataTransferPort= getDataNodePort();
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*(/src/test/.*|FavoredNodesManager).java")
public int getDataNodePort() {
HdfsConfiguration.init();

Configuration dnConf = new HdfsConfiguration(provider.getConfiguration());

int dnPort = NetUtils.createSocketAddr(
dnConf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT)).getPort();
LOG.debug("Loaded default datanode port for FN: " + datanodeDataTransferPort);
return dnPort;
// is called. See HBASE-23737 "[Flakey Tests] TestFavoredNodeTableImport fails 30% of the time"
synchronized (this) {
this.globalFavoredNodesAssignmentPlan
.updateFavoredNodesMap(snapshot.getExistingAssignmentPlan());
primaryRSToRegionMap.putAll(snapshot.getPrimaryToRegionInfoMap());
secondaryRSToRegionMap.putAll(snapshot.getSecondaryToRegionInfoMap());
teritiaryRSToRegionMap.putAll(snapshot.getTertiaryToRegionInfoMap());
datanodeDataTransferPort = getDataNodePort(provider.getConfiguration());
}
}

public synchronized List<ServerName> getFavoredNodes(RegionInfo regionInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] mergeP
throws IOException;

List<ServerName> getFavoredNodes(RegionInfo regionInfo);

void setFavoredNodesManager(FavoredNodesManager fnm);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public interface ClusterInfoProvider extends ConfigurationObserver {
*/
List<RegionInfo> getAssignedRegions();

/**
* Unassign the given region.
*/
void unassign(RegionInfo regionInfo) throws IOException;

/**
* Get the table descriptor for the given table.
*/
Expand All @@ -83,6 +88,11 @@ HDFSBlocksDistribution computeHDFSBlocksDistribution(Configuration conf,
*/
boolean hasRegionReplica(Collection<RegionInfo> regions) throws IOException;

/**
* Returns a copy of the internal list of online servers.
*/
List<ServerName> getOnlineServersList();

/**
* Returns a copy of the internal list of online servers matched by the given {@code filter}.
*/
Expand Down Expand Up @@ -110,4 +120,9 @@ List<ServerName> getOnlineServersListWithPredicator(List<ServerName> servers,
* Record the given balancer rejection.
*/
void recordBalancerRejection(Supplier<BalancerRejection> rejection);

/**
* Returns server metrics of the given server if serverName is known else null
*/
ServerMetrics getLoad(ServerName serverName);
}
Loading

0 comments on commit 06c6e06

Please sign in to comment.