From 06d13fa56e95d9048821f70e5dede8959c1b5405 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 21 Apr 2021 10:25:15 +0800 Subject: [PATCH 1/2] HBASE-25793 Move BaseLoadBalancer.Cluster to a separated file --- .../master/balancer/AssignRegionAction.java | 54 + .../hbase/master/balancer/BalanceAction.java | 55 + .../master/balancer/BalancerClusterState.java | 866 ++++++++++++++++ .../master/balancer/MoveRegionAction.java | 57 ++ .../master/balancer/SwapRegionsAction.java | 62 ++ .../master/balancer/BaseLoadBalancer.java | 944 +----------------- .../master/balancer/CandidateGenerator.java | 28 +- .../balancer/FavoredStochasticBalancer.java | 24 +- .../HeterogeneousRegionCountCostFunction.java | 2 +- .../balancer/LoadCandidateGenerator.java | 6 +- .../LocalityBasedCandidateGenerator.java | 16 +- .../RegionReplicaCandidateGenerator.java | 4 +- .../master/balancer/SimpleLoadBalancer.java | 3 +- .../balancer/StochasticLoadBalancer.java | 62 +- .../master/balancer/BalancerTestBase.java | 4 +- .../master/balancer/TestBaseLoadBalancer.java | 27 +- .../TestFavoredStochasticBalancerPickers.java | 20 +- .../balancer/TestStochasticLoadBalancer.java | 15 +- ...ochasticLoadBalancerHeterogeneousCost.java | 10 +- ...stStochasticLoadBalancerRegionReplica.java | 19 +- 20 files changed, 1226 insertions(+), 1052 deletions(-) create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java create mode 100644 hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java new file mode 100644 index 000000000000..a5440d6c6afc --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/AssignRegionAction.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class AssignRegionAction extends BalanceAction { + private final int region; + private final int server; + + public AssignRegionAction(int region, int server) { + super(Type.ASSIGN_REGION); + this.region = region; + this.server = server; + } + + public int getRegion() { + return region; + } + + public int getServer() { + return server; + } + + @Override + public BalanceAction undoAction() { + // TODO implement this. This action is not being used by the StochasticLB for now + // in case it uses it, we should implement this function. + throw new UnsupportedOperationException(HConstants.NOT_IMPLEMENTED); + } + + @Override + public String toString() { + return getType() + ": " + region + ":" + server; + } + +} \ No newline at end of file diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java new file mode 100644 index 000000000000..9158e353bb79 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalanceAction.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * An action to move or swap a region + */ +@InterfaceAudience.Private +abstract class BalanceAction { + enum Type { + ASSIGN_REGION, MOVE_REGION, SWAP_REGIONS, NULL, + } + + static final BalanceAction NULL_ACTION = new BalanceAction(Type.NULL) { + }; + + private final Type type; + + BalanceAction(Type type) { + this.type = type; + } + + /** + * Returns an Action which would undo this action + */ + BalanceAction undoAction() { + return this; + } + + Type getType() { + return type; + } + + @Override + public String toString() { + return type + ":"; + } +} \ No newline at end of file diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java new file mode 100644 index 000000000000..ed10c19c3917 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -0,0 +1,866 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.hadoop.hbase.HDFSBlocksDistribution; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.master.RackManager; +import org.apache.hadoop.hbase.net.Address; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An efficient array based implementation similar to ClusterState for keeping the status of the + * cluster in terms of region assignment and distribution. LoadBalancers, such as + * StochasticLoadBalancer uses this Cluster object because of hundreds of thousands of hashmap + * manipulations are very costly, which is why this class uses mostly indexes and arrays. + *

+ * BalancerClusterState tracks a list of unassigned regions, region assignments, and the server + * topology in terms of server names, hostnames and racks. + */ +@InterfaceAudience.Private +class BalancerClusterState { + + private static final Logger LOG = LoggerFactory.getLogger(BalancerClusterState.class); + + ServerName[] servers; + // ServerName uniquely identifies a region server. multiple RS can run on the same host + String[] hosts; + String[] racks; + boolean multiServersPerHost = false; // whether or not any host has more than one server + + ArrayList tables; + RegionInfo[] regions; + Deque[] regionLoads; + private RegionHDFSBlockLocationFinder regionFinder; + + int[][] regionLocations; // regionIndex -> list of serverIndex sorted by locality + + int[] serverIndexToHostIndex; // serverIndex -> host index + int[] serverIndexToRackIndex; // serverIndex -> rack index + + int[][] regionsPerServer; // serverIndex -> region list + int[] serverIndexToRegionsOffset; // serverIndex -> offset of region list + int[][] regionsPerHost; // hostIndex -> list of regions + int[][] regionsPerRack; // rackIndex -> region list + int[][] primariesOfRegionsPerServer; // serverIndex -> sorted list of regions by primary region + // index + int[][] primariesOfRegionsPerHost; // hostIndex -> sorted list of regions by primary region index + int[][] primariesOfRegionsPerRack; // rackIndex -> sorted list of regions by primary region index + + int[][] serversPerHost; // hostIndex -> list of server indexes + int[][] serversPerRack; // rackIndex -> list of server indexes + int[] regionIndexToServerIndex; // regionIndex -> serverIndex + int[] initialRegionIndexToServerIndex; // regionIndex -> serverIndex (initial cluster state) + int[] regionIndexToTableIndex; // regionIndex -> tableIndex + int[][] numRegionsPerServerPerTable; // serverIndex -> tableIndex -> # regions + int[] numMaxRegionsPerTable; // tableIndex -> max number of regions in a single RS + int[] regionIndexToPrimaryIndex; // regionIndex -> regionIndex of the primary + boolean hasRegionReplicas = false; // whether there is regions with replicas + + Integer[] serverIndicesSortedByRegionCount; + Integer[] serverIndicesSortedByLocality; + + Map serversToIndex; + Map hostsToIndex; + Map racksToIndex; + Map tablesToIndex; + Map regionsToIndex; + float[] localityPerServer; + + int numServers; + int numHosts; + int numRacks; + int numTables; + int numRegions; + + int numMovedRegions = 0; // num moved regions from the initial configuration + Map> clusterState; + + private final RackManager rackManager; + // Maps region -> rackIndex -> locality of region on rack + private float[][] rackLocalities; + // Maps localityType -> region -> [server|rack]Index with highest locality + private int[][] regionsToMostLocalEntities; + + static class DefaultRackManager extends RackManager { + @Override + public String getRack(ServerName server) { + return UNKNOWN_RACK; + } + } + + BalancerClusterState(Map> clusterState, + Map> loads, RegionHDFSBlockLocationFinder regionFinder, + RackManager rackManager) { + this(null, clusterState, loads, regionFinder, rackManager); + } + + @SuppressWarnings("unchecked") + BalancerClusterState(Collection unassignedRegions, + Map> clusterState, Map> loads, + RegionHDFSBlockLocationFinder regionFinder, RackManager rackManager) { + if (unassignedRegions == null) { + unassignedRegions = Collections.emptyList(); + } + + serversToIndex = new HashMap<>(); + hostsToIndex = new HashMap<>(); + racksToIndex = new HashMap<>(); + tablesToIndex = new HashMap<>(); + + // TODO: We should get the list of tables from master + tables = new ArrayList<>(); + this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); + + numRegions = 0; + + List> serversPerHostList = new ArrayList<>(); + List> serversPerRackList = new ArrayList<>(); + this.clusterState = clusterState; + this.regionFinder = regionFinder; + + // Use servername and port as there can be dead servers in this list. We want everything with + // a matching hostname and port to have the same index. + for (ServerName sn : clusterState.keySet()) { + if (sn == null) { + LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + + "skipping; unassigned regions?"); + if (LOG.isTraceEnabled()) { + LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); + } + continue; + } + if (serversToIndex.get(sn.getAddress()) == null) { + serversToIndex.put(sn.getAddress(), numServers++); + } + if (!hostsToIndex.containsKey(sn.getHostname())) { + hostsToIndex.put(sn.getHostname(), numHosts++); + serversPerHostList.add(new ArrayList<>(1)); + } + + int serverIndex = serversToIndex.get(sn.getAddress()); + int hostIndex = hostsToIndex.get(sn.getHostname()); + serversPerHostList.get(hostIndex).add(serverIndex); + + String rack = this.rackManager.getRack(sn); + if (!racksToIndex.containsKey(rack)) { + racksToIndex.put(rack, numRacks++); + serversPerRackList.add(new ArrayList<>()); + } + int rackIndex = racksToIndex.get(rack); + serversPerRackList.get(rackIndex).add(serverIndex); + } + + // Count how many regions there are. + for (Map.Entry> entry : clusterState.entrySet()) { + numRegions += entry.getValue().size(); + } + numRegions += unassignedRegions.size(); + + regionsToIndex = new HashMap<>(numRegions); + servers = new ServerName[numServers]; + serversPerHost = new int[numHosts][]; + serversPerRack = new int[numRacks][]; + regions = new RegionInfo[numRegions]; + regionIndexToServerIndex = new int[numRegions]; + initialRegionIndexToServerIndex = new int[numRegions]; + regionIndexToTableIndex = new int[numRegions]; + regionIndexToPrimaryIndex = new int[numRegions]; + regionLoads = new Deque[numRegions]; + + regionLocations = new int[numRegions][]; + serverIndicesSortedByRegionCount = new Integer[numServers]; + serverIndicesSortedByLocality = new Integer[numServers]; + localityPerServer = new float[numServers]; + + serverIndexToHostIndex = new int[numServers]; + serverIndexToRackIndex = new int[numServers]; + regionsPerServer = new int[numServers][]; + serverIndexToRegionsOffset = new int[numServers]; + regionsPerHost = new int[numHosts][]; + regionsPerRack = new int[numRacks][]; + primariesOfRegionsPerServer = new int[numServers][]; + primariesOfRegionsPerHost = new int[numHosts][]; + primariesOfRegionsPerRack = new int[numRacks][]; + + int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; + + for (Entry> entry : clusterState.entrySet()) { + if (entry.getKey() == null) { + LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); + continue; + } + int serverIndex = serversToIndex.get(entry.getKey().getAddress()); + + // keep the servername if this is the first server name for this hostname + // or this servername has the newest startcode. + if (servers[serverIndex] == null || + servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { + servers[serverIndex] = entry.getKey(); + } + + if (regionsPerServer[serverIndex] != null) { + // there is another server with the same hostAndPort in ClusterState. + // allocate the array for the total size + regionsPerServer[serverIndex] = + new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; + } else { + regionsPerServer[serverIndex] = new int[entry.getValue().size()]; + } + primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; + serverIndicesSortedByRegionCount[serverIndex] = serverIndex; + serverIndicesSortedByLocality[serverIndex] = serverIndex; + } + + hosts = new String[numHosts]; + for (Entry entry : hostsToIndex.entrySet()) { + hosts[entry.getValue()] = entry.getKey(); + } + racks = new String[numRacks]; + for (Entry entry : racksToIndex.entrySet()) { + racks[entry.getValue()] = entry.getKey(); + } + + for (Entry> entry : clusterState.entrySet()) { + int serverIndex = serversToIndex.get(entry.getKey().getAddress()); + regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; + + int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); + serverIndexToHostIndex[serverIndex] = hostIndex; + + int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); + serverIndexToRackIndex[serverIndex] = rackIndex; + + for (RegionInfo region : entry.getValue()) { + registerRegion(region, regionIndex, serverIndex, loads, regionFinder); + regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; + regionIndex++; + } + serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; + } + + for (RegionInfo region : unassignedRegions) { + registerRegion(region, regionIndex, -1, loads, regionFinder); + regionIndex++; + } + + for (int i = 0; i < serversPerHostList.size(); i++) { + serversPerHost[i] = new int[serversPerHostList.get(i).size()]; + for (int j = 0; j < serversPerHost[i].length; j++) { + serversPerHost[i][j] = serversPerHostList.get(i).get(j); + } + if (serversPerHost[i].length > 1) { + multiServersPerHost = true; + } + } + + for (int i = 0; i < serversPerRackList.size(); i++) { + serversPerRack[i] = new int[serversPerRackList.get(i).size()]; + for (int j = 0; j < serversPerRack[i].length; j++) { + serversPerRack[i][j] = serversPerRackList.get(i).get(j); + } + } + + numTables = tables.size(); + numRegionsPerServerPerTable = new int[numServers][numTables]; + + for (int i = 0; i < numServers; i++) { + for (int j = 0; j < numTables; j++) { + numRegionsPerServerPerTable[i][j] = 0; + } + } + + for (int i = 0; i < regionIndexToServerIndex.length; i++) { + if (regionIndexToServerIndex[i] >= 0) { + numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + } + } + + numMaxRegionsPerTable = new int[numTables]; + for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { + for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { + if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; + } + } + } + + for (int i = 0; i < regions.length; i++) { + RegionInfo info = regions[i]; + if (RegionReplicaUtil.isDefaultReplica(info)) { + regionIndexToPrimaryIndex[i] = i; + } else { + hasRegionReplicas = true; + RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); + regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); + } + } + + for (int i = 0; i < regionsPerServer.length; i++) { + primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; + for (int j = 0; j < regionsPerServer[i].length; j++) { + int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; + primariesOfRegionsPerServer[i][j] = primaryIndex; + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerServer[i]); + } + + // compute regionsPerHost + if (multiServersPerHost) { + for (int i = 0; i < serversPerHost.length; i++) { + int numRegionsPerHost = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; + } + regionsPerHost[i] = new int[numRegionsPerHost]; + primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; + } + for (int i = 0; i < serversPerHost.length; i++) { + int numRegionPerHostIndex = 0; + for (int j = 0; j < serversPerHost[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { + int region = regionsPerServer[serversPerHost[i][j]][k]; + regionsPerHost[i][numRegionPerHostIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; + numRegionPerHostIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerHost[i]); + } + } + + // compute regionsPerRack + if (numRacks > 1) { + for (int i = 0; i < serversPerRack.length; i++) { + int numRegionsPerRack = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; + } + regionsPerRack[i] = new int[numRegionsPerRack]; + primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; + } + + for (int i = 0; i < serversPerRack.length; i++) { + int numRegionPerRackIndex = 0; + for (int j = 0; j < serversPerRack[i].length; j++) { + for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { + int region = regionsPerServer[serversPerRack[i][j]][k]; + regionsPerRack[i][numRegionPerRackIndex] = region; + int primaryIndex = regionIndexToPrimaryIndex[region]; + primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; + numRegionPerRackIndex++; + } + } + // sort the regions by primaries. + Arrays.sort(primariesOfRegionsPerRack[i]); + } + } + } + + /** Helper for Cluster constructor to handle a region */ + private void registerRegion(RegionInfo region, int regionIndex, int serverIndex, + Map> loads, RegionHDFSBlockLocationFinder regionFinder) { + String tableName = region.getTable().getNameAsString(); + if (!tablesToIndex.containsKey(tableName)) { + tables.add(tableName); + tablesToIndex.put(tableName, tablesToIndex.size()); + } + int tableIndex = tablesToIndex.get(tableName); + + regionsToIndex.put(region, regionIndex); + regions[regionIndex] = region; + regionIndexToServerIndex[regionIndex] = serverIndex; + initialRegionIndexToServerIndex[regionIndex] = serverIndex; + regionIndexToTableIndex[regionIndex] = tableIndex; + + // region load + if (loads != null) { + Deque rl = loads.get(region.getRegionNameAsString()); + // That could have failed if the RegionLoad is using the other regionName + if (rl == null) { + // Try getting the region load using encoded name. + rl = loads.get(region.getEncodedName()); + } + regionLoads[regionIndex] = rl; + } + + if (regionFinder != null) { + // region location + List loc = regionFinder.getTopBlockLocations(region); + regionLocations[regionIndex] = new int[loc.size()]; + for (int i = 0; i < loc.size(); i++) { + regionLocations[regionIndex][i] = loc.get(i) == null ? -1 : + (serversToIndex.get(loc.get(i).getAddress()) == null ? -1 : + serversToIndex.get(loc.get(i).getAddress())); + } + } + } + + /** + * Returns true iff a given server has less regions than the balanced amount + */ + public boolean serverHasTooFewRegions(int server) { + int minLoad = this.numRegions / numServers; + int numRegions = getNumRegions(server); + return numRegions < minLoad; + } + + /** + * Retrieves and lazily initializes a field storing the locality of every region/server + * combination + */ + public float[][] getOrComputeRackLocalities() { + if (rackLocalities == null || regionsToMostLocalEntities == null) { + computeCachedLocalities(); + } + return rackLocalities; + } + + /** + * Lazily initializes and retrieves a mapping of region -> server for which region has the highest + * the locality + */ + public int[] getOrComputeRegionsToMostLocalEntities(BalancerClusterState.LocalityType type) { + if (rackLocalities == null || regionsToMostLocalEntities == null) { + computeCachedLocalities(); + } + return regionsToMostLocalEntities[type.ordinal()]; + } + + /** + * Looks up locality from cache of localities. Will create cache if it does not already exist. + */ + public float getOrComputeLocality(int region, int entity, + BalancerClusterState.LocalityType type) { + switch (type) { + case SERVER: + return getLocalityOfRegion(region, entity); + case RACK: + return getOrComputeRackLocalities()[region][entity]; + default: + throw new IllegalArgumentException("Unsupported LocalityType: " + type); + } + } + + /** + * Returns locality weighted by region size in MB. Will create locality cache if it does not + * already exist. + */ + public double getOrComputeWeightedLocality(int region, int server, + BalancerClusterState.LocalityType type) { + return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); + } + + /** + * Returns the size in MB from the most recent RegionLoad for region + */ + public int getRegionSizeMB(int region) { + Deque load = regionLoads[region]; + // This means regions have no actual data on disk + if (load == null) { + return 0; + } + return regionLoads[region].getLast().getStorefileSizeMB(); + } + + /** + * Computes and caches the locality for each region/rack combinations, as well as storing a + * mapping of region -> server and region -> rack such that server and rack have the highest + * locality for region + */ + private void computeCachedLocalities() { + rackLocalities = new float[numRegions][numRacks]; + regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; + + // Compute localities and find most local server per region + for (int region = 0; region < numRegions; region++) { + int serverWithBestLocality = 0; + float bestLocalityForRegion = 0; + for (int server = 0; server < numServers; server++) { + // Aggregate per-rack locality + float locality = getLocalityOfRegion(region, server); + int rack = serverIndexToRackIndex[server]; + int numServersInRack = serversPerRack[rack].length; + rackLocalities[region][rack] += locality / numServersInRack; + + if (locality > bestLocalityForRegion) { + serverWithBestLocality = server; + bestLocalityForRegion = locality; + } + } + regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; + + // Find most local rack per region + int rackWithBestLocality = 0; + float bestRackLocalityForRegion = 0.0f; + for (int rack = 0; rack < numRacks; rack++) { + float rackLocality = rackLocalities[region][rack]; + if (rackLocality > bestRackLocalityForRegion) { + bestRackLocalityForRegion = rackLocality; + rackWithBestLocality = rack; + } + } + regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; + } + + } + + /** + * Maps region index to rack index + */ + public int getRackForRegion(int region) { + return serverIndexToRackIndex[regionIndexToServerIndex[region]]; + } + + enum LocalityType { + SERVER, RACK + } + + public void doAction(BalanceAction action) { + switch (action.getType()) { + case NULL: + break; + case ASSIGN_REGION: + // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings + assert action instanceof AssignRegionAction : action.getClass(); + AssignRegionAction ar = (AssignRegionAction) action; + regionsPerServer[ar.getServer()] = + addRegion(regionsPerServer[ar.getServer()], ar.getRegion()); + regionMoved(ar.getRegion(), -1, ar.getServer()); + break; + case MOVE_REGION: + assert action instanceof MoveRegionAction : action.getClass(); + MoveRegionAction mra = (MoveRegionAction) action; + regionsPerServer[mra.getFromServer()] = + removeRegion(regionsPerServer[mra.getFromServer()], mra.getRegion()); + regionsPerServer[mra.getToServer()] = + addRegion(regionsPerServer[mra.getToServer()], mra.getRegion()); + regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); + break; + case SWAP_REGIONS: + assert action instanceof SwapRegionsAction : action.getClass(); + SwapRegionsAction a = (SwapRegionsAction) action; + regionsPerServer[a.getFromServer()] = + replaceRegion(regionsPerServer[a.getFromServer()], a.getFromRegion(), a.getToRegion()); + regionsPerServer[a.getToServer()] = + replaceRegion(regionsPerServer[a.getToServer()], a.getToRegion(), a.getFromRegion()); + regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); + regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); + break; + default: + throw new RuntimeException("Uknown action:" + action.getType()); + } + } + + /** + * Return true if the placement of region on server would lower the availability of the region in + * question + * @return true or false + */ + boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getAddress())) { + return false; // safeguard against race between cluster.servers and servers from LB method + // args + } + int server = serversToIndex.get(serverName.getAddress()); + int region = regionsToIndex.get(regionInfo); + + // Region replicas for same region should better assign to different servers + for (int i : regionsPerServer[server]) { + RegionInfo otherRegionInfo = regions[i]; + if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { + return true; + } + } + + int primary = regionIndexToPrimaryIndex[region]; + if (primary == -1) { + return false; + } + // there is a subset relation for server < host < rack + // check server first + if (contains(primariesOfRegionsPerServer[server], primary)) { + // check for whether there are other servers that we can place this region + for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { + if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { + return true; // meaning there is a better server + } + } + return false; // there is not a better server to place this + } + + // check host + if (multiServersPerHost) { + // these arrays would only be allocated if we have more than one server per host + int host = serverIndexToHostIndex[server]; + if (contains(primariesOfRegionsPerHost[host], primary)) { + // check for whether there are other hosts that we can place this region + for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { + if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { + return true; // meaning there is a better host + } + } + return false; // there is not a better host to place this + } + } + + // check rack + if (numRacks > 1) { + int rack = serverIndexToRackIndex[server]; + if (contains(primariesOfRegionsPerRack[rack], primary)) { + // check for whether there are other racks that we can place this region + for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { + if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { + return true; // meaning there is a better rack + } + } + return false; // there is not a better rack to place this + } + } + + return false; + } + + void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { + if (!serversToIndex.containsKey(serverName.getAddress())) { + return; + } + int server = serversToIndex.get(serverName.getAddress()); + int region = regionsToIndex.get(regionInfo); + doAction(new AssignRegionAction(region, server)); + } + + void regionMoved(int region, int oldServer, int newServer) { + regionIndexToServerIndex[region] = newServer; + if (initialRegionIndexToServerIndex[region] == newServer) { + numMovedRegions--; // region moved back to original location + } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { + numMovedRegions++; // region moved from original location + } + int tableIndex = regionIndexToTableIndex[region]; + if (oldServer >= 0) { + numRegionsPerServerPerTable[oldServer][tableIndex]--; + } + numRegionsPerServerPerTable[newServer][tableIndex]++; + + // check whether this caused maxRegionsPerTable in the new Server to be updated + if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; + } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + + 1) == numMaxRegionsPerTable[tableIndex]) { + // recompute maxRegionsPerTable since the previous value was coming from the old server + numMaxRegionsPerTable[tableIndex] = 0; + for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { + if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; + } + } + } + + // update for servers + int primary = regionIndexToPrimaryIndex[region]; + if (oldServer >= 0) { + primariesOfRegionsPerServer[oldServer] = + removeRegion(primariesOfRegionsPerServer[oldServer], primary); + } + primariesOfRegionsPerServer[newServer] = + addRegionSorted(primariesOfRegionsPerServer[newServer], primary); + + // update for hosts + if (multiServersPerHost) { + int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; + int newHost = serverIndexToHostIndex[newServer]; + if (newHost != oldHost) { + regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); + primariesOfRegionsPerHost[newHost] = + addRegionSorted(primariesOfRegionsPerHost[newHost], primary); + if (oldHost >= 0) { + regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); + primariesOfRegionsPerHost[oldHost] = + removeRegion(primariesOfRegionsPerHost[oldHost], primary); // will still be sorted + } + } + } + + // update for racks + if (numRacks > 1) { + int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; + int newRack = serverIndexToRackIndex[newServer]; + if (newRack != oldRack) { + regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); + primariesOfRegionsPerRack[newRack] = + addRegionSorted(primariesOfRegionsPerRack[newRack], primary); + if (oldRack >= 0) { + regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); + primariesOfRegionsPerRack[oldRack] = + removeRegion(primariesOfRegionsPerRack[oldRack], primary); // will still be sorted + } + } + } + } + + int[] removeRegion(int[] regions, int regionIndex) { + // TODO: this maybe costly. Consider using linked lists + int[] newRegions = new int[regions.length - 1]; + int i = 0; + for (i = 0; i < regions.length; i++) { + if (regions[i] == regionIndex) { + break; + } + newRegions[i] = regions[i]; + } + System.arraycopy(regions, i + 1, newRegions, i, newRegions.length - i); + return newRegions; + } + + int[] addRegion(int[] regions, int regionIndex) { + int[] newRegions = new int[regions.length + 1]; + System.arraycopy(regions, 0, newRegions, 0, regions.length); + newRegions[newRegions.length - 1] = regionIndex; + return newRegions; + } + + int[] addRegionSorted(int[] regions, int regionIndex) { + int[] newRegions = new int[regions.length + 1]; + int i = 0; + for (i = 0; i < regions.length; i++) { // find the index to insert + if (regions[i] > regionIndex) { + break; + } + } + System.arraycopy(regions, 0, newRegions, 0, i); // copy first half + System.arraycopy(regions, i, newRegions, i + 1, regions.length - i); // copy second half + newRegions[i] = regionIndex; + + return newRegions; + } + + int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { + int i = 0; + for (i = 0; i < regions.length; i++) { + if (regions[i] == regionIndex) { + regions[i] = newRegionIndex; + break; + } + } + return regions; + } + + void sortServersByRegionCount() { + Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); + } + + int getNumRegions(int server) { + return regionsPerServer[server].length; + } + + boolean contains(int[] arr, int val) { + return Arrays.binarySearch(arr, val) >= 0; + } + + private Comparator numRegionsComparator = Comparator.comparingInt(this::getNumRegions); + + int getLowestLocalityRegionOnServer(int serverIndex) { + if (regionFinder != null) { + float lowestLocality = 1.0f; + int lowestLocalityRegionIndex = -1; + if (regionsPerServer[serverIndex].length == 0) { + // No regions on that region server + return -1; + } + for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { + int regionIndex = regionsPerServer[serverIndex][j]; + HDFSBlocksDistribution distribution = + regionFinder.getBlockDistribution(regions[regionIndex]); + float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); + // skip empty region + if (distribution.getUniqueBlocksTotalWeight() == 0) { + continue; + } + if (locality < lowestLocality) { + lowestLocality = locality; + lowestLocalityRegionIndex = j; + } + } + if (lowestLocalityRegionIndex == -1) { + return -1; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Lowest locality region is " + + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] + .getRegionNameAsString() + + " with locality " + lowestLocality + " and its region server contains " + + regionsPerServer[serverIndex].length + " regions"); + } + return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; + } else { + return -1; + } + } + + float getLocalityOfRegion(int region, int server) { + if (regionFinder != null) { + HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); + return distribution.getBlockLocalityIndex(servers[server].getHostname()); + } else { + return 0f; + } + } + + void setNumRegions(int numRegions) { + this.numRegions = numRegions; + } + + void setNumMovedRegions(int numMovedRegions) { + this.numMovedRegions = numMovedRegions; + } + + @Override + public String toString() { + StringBuilder desc = new StringBuilder("Cluster={servers=["); + for (ServerName sn : servers) { + desc.append(sn.getAddress().toString()).append(", "); + } + desc.append("], serverIndicesSortedByRegionCount=") + .append(Arrays.toString(serverIndicesSortedByRegionCount)).append(", regionsPerServer=") + .append(Arrays.deepToString(regionsPerServer)); + + desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) + .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) + .append(", numTables=").append(numTables).append(", numMovedRegions=").append(numMovedRegions) + .append('}'); + return desc.toString(); + } +} \ No newline at end of file diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java new file mode 100644 index 000000000000..f73fada18759 --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/MoveRegionAction.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class MoveRegionAction extends BalanceAction { + private final int region; + private final int fromServer; + private final int toServer; + + public MoveRegionAction(int region, int fromServer, int toServer) { + super(Type.MOVE_REGION); + this.fromServer = fromServer; + this.region = region; + this.toServer = toServer; + } + + public int getRegion() { + return region; + } + + public int getFromServer() { + return fromServer; + } + + public int getToServer() { + return toServer; + } + + @Override + public BalanceAction undoAction() { + return new MoveRegionAction(region, toServer, fromServer); + } + + @Override + public String toString() { + return getType() + ": " + region + ":" + fromServer + " -> " + toServer; + } +} \ No newline at end of file diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java new file mode 100644 index 000000000000..9963fe0f6c0d --- /dev/null +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/SwapRegionsAction.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.balancer; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class SwapRegionsAction extends BalanceAction { + private final int fromServer; + private final int fromRegion; + private final int toServer; + private final int toRegion; + + SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { + super(Type.SWAP_REGIONS); + this.fromServer = fromServer; + this.fromRegion = fromRegion; + this.toServer = toServer; + this.toRegion = toRegion; + } + + public int getFromServer() { + return fromServer; + } + + public int getFromRegion() { + return fromRegion; + } + + public int getToServer() { + return toServer; + } + + public int getToRegion() { + return toRegion; + } + + @Override + public BalanceAction undoAction() { + return new SwapRegionsAction(fromServer, toRegion, toServer, fromRegion); + } + + @Override + public String toString() { + return getType() + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 0de4b21b0416..4374de1ed549 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -21,40 +20,31 @@ import edu.umd.cs.findbugs.annotations.NonNull; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; -import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Random; import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; -import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.ServerMetrics; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; -import org.apache.hadoop.hbase.net.Address; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +71,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected static final int MIN_SERVER_BALANCE = 2; private volatile boolean stopped = false; - private static final List EMPTY_REGION_LIST = Collections.emptyList(); + static final List EMPTY_REGION_LIST = Collections.emptyList(); static final Predicate IDLE_SERVER_PREDICATOR = load -> load.getRegionMetrics().isEmpty(); @@ -90,13 +80,6 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected boolean useRegionFinder; protected boolean isByTable = false; - private static class DefaultRackManager extends RackManager { - @Override - public String getRack(ServerName server) { - return UNKNOWN_RACK; - } - } - /** * The constructor that uses the basic MetricsBalancer */ @@ -121,912 +104,13 @@ private void createRegionFinder() { } } - /** - * An efficient array based implementation similar to ClusterState for keeping - * the status of the cluster in terms of region assignment and distribution. - * LoadBalancers, such as StochasticLoadBalancer uses this Cluster object because of - * hundreds of thousands of hashmap manipulations are very costly, which is why this - * class uses mostly indexes and arrays. - * - * Cluster tracks a list of unassigned regions, region assignments, and the server - * topology in terms of server names, hostnames and racks. - */ - protected static class Cluster { - ServerName[] servers; - String[] hosts; // ServerName uniquely identifies a region server. multiple RS can run on the same host - String[] racks; - boolean multiServersPerHost = false; // whether or not any host has more than one server - - ArrayList tables; - RegionInfo[] regions; - Deque[] regionLoads; - private RegionHDFSBlockLocationFinder regionFinder; - - int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality - - int[] serverIndexToHostIndex; //serverIndex -> host index - int[] serverIndexToRackIndex; //serverIndex -> rack index - - int[][] regionsPerServer; //serverIndex -> region list - int[] serverIndexToRegionsOffset; //serverIndex -> offset of region list - int[][] regionsPerHost; //hostIndex -> list of regions - int[][] regionsPerRack; //rackIndex -> region list - int[][] primariesOfRegionsPerServer; //serverIndex -> sorted list of regions by primary region index - int[][] primariesOfRegionsPerHost; //hostIndex -> sorted list of regions by primary region index - int[][] primariesOfRegionsPerRack; //rackIndex -> sorted list of regions by primary region index - - int[][] serversPerHost; //hostIndex -> list of server indexes - int[][] serversPerRack; //rackIndex -> list of server indexes - int[] regionIndexToServerIndex; //regionIndex -> serverIndex - int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) - int[] regionIndexToTableIndex; //regionIndex -> tableIndex - int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions - int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS - int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary - boolean hasRegionReplicas = false; //whether there is regions with replicas - - Integer[] serverIndicesSortedByRegionCount; - Integer[] serverIndicesSortedByLocality; - - Map serversToIndex; - Map hostsToIndex; - Map racksToIndex; - Map tablesToIndex; - Map regionsToIndex; - float[] localityPerServer; - - int numServers; - int numHosts; - int numRacks; - int numTables; - int numRegions; - - int numMovedRegions = 0; //num moved regions from the initial configuration - Map> clusterState; - - protected final RackManager rackManager; - // Maps region -> rackIndex -> locality of region on rack - private float[][] rackLocalities; - // Maps localityType -> region -> [server|rack]Index with highest locality - private int[][] regionsToMostLocalEntities; - - protected Cluster( - Map> clusterState, - Map> loads, - RegionHDFSBlockLocationFinder regionFinder, - RackManager rackManager) { - this(null, clusterState, loads, regionFinder, rackManager); - } - - @SuppressWarnings("unchecked") - protected Cluster( - Collection unassignedRegions, - Map> clusterState, - Map> loads, - RegionHDFSBlockLocationFinder regionFinder, - RackManager rackManager) { - - if (unassignedRegions == null) { - unassignedRegions = EMPTY_REGION_LIST; - } - - serversToIndex = new HashMap<>(); - hostsToIndex = new HashMap<>(); - racksToIndex = new HashMap<>(); - tablesToIndex = new HashMap<>(); - - //TODO: We should get the list of tables from master - tables = new ArrayList<>(); - this.rackManager = rackManager != null ? rackManager : new DefaultRackManager(); - - numRegions = 0; - - List> serversPerHostList = new ArrayList<>(); - List> serversPerRackList = new ArrayList<>(); - this.clusterState = clusterState; - this.regionFinder = regionFinder; - - // Use servername and port as there can be dead servers in this list. We want everything with - // a matching hostname and port to have the same index. - for (ServerName sn : clusterState.keySet()) { - if (sn == null) { - LOG.warn("TODO: Enable TRACE on BaseLoadBalancer. Empty servername); " + - "skipping; unassigned regions?"); - if (LOG.isTraceEnabled()) { - LOG.trace("EMPTY SERVERNAME " + clusterState.toString()); - } - continue; - } - if (serversToIndex.get(sn.getAddress()) == null) { - serversToIndex.put(sn.getAddress(), numServers++); - } - if (!hostsToIndex.containsKey(sn.getHostname())) { - hostsToIndex.put(sn.getHostname(), numHosts++); - serversPerHostList.add(new ArrayList<>(1)); - } - - int serverIndex = serversToIndex.get(sn.getAddress()); - int hostIndex = hostsToIndex.get(sn.getHostname()); - serversPerHostList.get(hostIndex).add(serverIndex); - - String rack = this.rackManager.getRack(sn); - if (!racksToIndex.containsKey(rack)) { - racksToIndex.put(rack, numRacks++); - serversPerRackList.add(new ArrayList<>()); - } - int rackIndex = racksToIndex.get(rack); - serversPerRackList.get(rackIndex).add(serverIndex); - } - - // Count how many regions there are. - for (Entry> entry : clusterState.entrySet()) { - numRegions += entry.getValue().size(); - } - numRegions += unassignedRegions.size(); - - regionsToIndex = new HashMap<>(numRegions); - servers = new ServerName[numServers]; - serversPerHost = new int[numHosts][]; - serversPerRack = new int[numRacks][]; - regions = new RegionInfo[numRegions]; - regionIndexToServerIndex = new int[numRegions]; - initialRegionIndexToServerIndex = new int[numRegions]; - regionIndexToTableIndex = new int[numRegions]; - regionIndexToPrimaryIndex = new int[numRegions]; - regionLoads = new Deque[numRegions]; - - regionLocations = new int[numRegions][]; - serverIndicesSortedByRegionCount = new Integer[numServers]; - serverIndicesSortedByLocality = new Integer[numServers]; - localityPerServer = new float[numServers]; - - serverIndexToHostIndex = new int[numServers]; - serverIndexToRackIndex = new int[numServers]; - regionsPerServer = new int[numServers][]; - serverIndexToRegionsOffset = new int[numServers]; - regionsPerHost = new int[numHosts][]; - regionsPerRack = new int[numRacks][]; - primariesOfRegionsPerServer = new int[numServers][]; - primariesOfRegionsPerHost = new int[numHosts][]; - primariesOfRegionsPerRack = new int[numRacks][]; - - int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; - - for (Entry> entry : clusterState.entrySet()) { - if (entry.getKey() == null) { - LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); - continue; - } - int serverIndex = serversToIndex.get(entry.getKey().getAddress()); - - // keep the servername if this is the first server name for this hostname - // or this servername has the newest startcode. - if (servers[serverIndex] == null || - servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) { - servers[serverIndex] = entry.getKey(); - } - - if (regionsPerServer[serverIndex] != null) { - // there is another server with the same hostAndPort in ClusterState. - // allocate the array for the total size - regionsPerServer[serverIndex] = new int[entry.getValue().size() + regionsPerServer[serverIndex].length]; - } else { - regionsPerServer[serverIndex] = new int[entry.getValue().size()]; - } - primariesOfRegionsPerServer[serverIndex] = new int[regionsPerServer[serverIndex].length]; - serverIndicesSortedByRegionCount[serverIndex] = serverIndex; - serverIndicesSortedByLocality[serverIndex] = serverIndex; - } - - hosts = new String[numHosts]; - for (Entry entry : hostsToIndex.entrySet()) { - hosts[entry.getValue()] = entry.getKey(); - } - racks = new String[numRacks]; - for (Entry entry : racksToIndex.entrySet()) { - racks[entry.getValue()] = entry.getKey(); - } - - for (Entry> entry : clusterState.entrySet()) { - int serverIndex = serversToIndex.get(entry.getKey().getAddress()); - regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; - - int hostIndex = hostsToIndex.get(entry.getKey().getHostname()); - serverIndexToHostIndex[serverIndex] = hostIndex; - - int rackIndex = racksToIndex.get(this.rackManager.getRack(entry.getKey())); - serverIndexToRackIndex[serverIndex] = rackIndex; - - for (RegionInfo region : entry.getValue()) { - registerRegion(region, regionIndex, serverIndex, loads, regionFinder); - regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex; - regionIndex++; - } - serverIndexToRegionsOffset[serverIndex] = regionPerServerIndex; - } - - for (RegionInfo region : unassignedRegions) { - registerRegion(region, regionIndex, -1, loads, regionFinder); - regionIndex++; - } - - for (int i = 0; i < serversPerHostList.size(); i++) { - serversPerHost[i] = new int[serversPerHostList.get(i).size()]; - for (int j = 0; j < serversPerHost[i].length; j++) { - serversPerHost[i][j] = serversPerHostList.get(i).get(j); - } - if (serversPerHost[i].length > 1) { - multiServersPerHost = true; - } - } - - for (int i = 0; i < serversPerRackList.size(); i++) { - serversPerRack[i] = new int[serversPerRackList.get(i).size()]; - for (int j = 0; j < serversPerRack[i].length; j++) { - serversPerRack[i][j] = serversPerRackList.get(i).get(j); - } - } - - numTables = tables.size(); - numRegionsPerServerPerTable = new int[numServers][numTables]; - - for (int i = 0; i < numServers; i++) { - for (int j = 0; j < numTables; j++) { - numRegionsPerServerPerTable[i][j] = 0; - } - } - - for (int i=0; i < regionIndexToServerIndex.length; i++) { - if (regionIndexToServerIndex[i] >= 0) { - numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; - } - } - - numMaxRegionsPerTable = new int[numTables]; - for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { - for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { - if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; - } - } - } - - for (int i = 0; i < regions.length; i ++) { - RegionInfo info = regions[i]; - if (RegionReplicaUtil.isDefaultReplica(info)) { - regionIndexToPrimaryIndex[i] = i; - } else { - hasRegionReplicas = true; - RegionInfo primaryInfo = RegionReplicaUtil.getRegionInfoForDefaultReplica(info); - regionIndexToPrimaryIndex[i] = regionsToIndex.getOrDefault(primaryInfo, -1); - } - } - - for (int i = 0; i < regionsPerServer.length; i++) { - primariesOfRegionsPerServer[i] = new int[regionsPerServer[i].length]; - for (int j = 0; j < regionsPerServer[i].length; j++) { - int primaryIndex = regionIndexToPrimaryIndex[regionsPerServer[i][j]]; - primariesOfRegionsPerServer[i][j] = primaryIndex; - } - // sort the regions by primaries. - Arrays.sort(primariesOfRegionsPerServer[i]); - } - - // compute regionsPerHost - if (multiServersPerHost) { - for (int i = 0 ; i < serversPerHost.length; i++) { - int numRegionsPerHost = 0; - for (int j = 0; j < serversPerHost[i].length; j++) { - numRegionsPerHost += regionsPerServer[serversPerHost[i][j]].length; - } - regionsPerHost[i] = new int[numRegionsPerHost]; - primariesOfRegionsPerHost[i] = new int[numRegionsPerHost]; - } - for (int i = 0 ; i < serversPerHost.length; i++) { - int numRegionPerHostIndex = 0; - for (int j = 0; j < serversPerHost[i].length; j++) { - for (int k = 0; k < regionsPerServer[serversPerHost[i][j]].length; k++) { - int region = regionsPerServer[serversPerHost[i][j]][k]; - regionsPerHost[i][numRegionPerHostIndex] = region; - int primaryIndex = regionIndexToPrimaryIndex[region]; - primariesOfRegionsPerHost[i][numRegionPerHostIndex] = primaryIndex; - numRegionPerHostIndex++; - } - } - // sort the regions by primaries. - Arrays.sort(primariesOfRegionsPerHost[i]); - } - } - - // compute regionsPerRack - if (numRacks > 1) { - for (int i = 0 ; i < serversPerRack.length; i++) { - int numRegionsPerRack = 0; - for (int j = 0; j < serversPerRack[i].length; j++) { - numRegionsPerRack += regionsPerServer[serversPerRack[i][j]].length; - } - regionsPerRack[i] = new int[numRegionsPerRack]; - primariesOfRegionsPerRack[i] = new int[numRegionsPerRack]; - } - - for (int i = 0 ; i < serversPerRack.length; i++) { - int numRegionPerRackIndex = 0; - for (int j = 0; j < serversPerRack[i].length; j++) { - for (int k = 0; k < regionsPerServer[serversPerRack[i][j]].length; k++) { - int region = regionsPerServer[serversPerRack[i][j]][k]; - regionsPerRack[i][numRegionPerRackIndex] = region; - int primaryIndex = regionIndexToPrimaryIndex[region]; - primariesOfRegionsPerRack[i][numRegionPerRackIndex] = primaryIndex; - numRegionPerRackIndex++; - } - } - // sort the regions by primaries. - Arrays.sort(primariesOfRegionsPerRack[i]); - } - } - } - - /** Helper for Cluster constructor to handle a region */ - private void registerRegion(RegionInfo region, int regionIndex, - int serverIndex, Map> loads, - RegionHDFSBlockLocationFinder regionFinder) { - String tableName = region.getTable().getNameAsString(); - if (!tablesToIndex.containsKey(tableName)) { - tables.add(tableName); - tablesToIndex.put(tableName, tablesToIndex.size()); - } - int tableIndex = tablesToIndex.get(tableName); - - regionsToIndex.put(region, regionIndex); - regions[regionIndex] = region; - regionIndexToServerIndex[regionIndex] = serverIndex; - initialRegionIndexToServerIndex[regionIndex] = serverIndex; - regionIndexToTableIndex[regionIndex] = tableIndex; - - // region load - if (loads != null) { - Deque rl = loads.get(region.getRegionNameAsString()); - // That could have failed if the RegionLoad is using the other regionName - if (rl == null) { - // Try getting the region load using encoded name. - rl = loads.get(region.getEncodedName()); - } - regionLoads[regionIndex] = rl; - } - - if (regionFinder != null) { - // region location - List loc = regionFinder.getTopBlockLocations(region); - regionLocations[regionIndex] = new int[loc.size()]; - for (int i = 0; i < loc.size(); i++) { - regionLocations[regionIndex][i] = loc.get(i) == null ? -1 - : (serversToIndex.get(loc.get(i).getAddress()) == null ? -1 - : serversToIndex.get(loc.get(i).getAddress())); - } - } - } - - /** - * Returns true iff a given server has less regions than the balanced amount - */ - public boolean serverHasTooFewRegions(int server) { - int minLoad = this.numRegions / numServers; - int numRegions = getNumRegions(server); - return numRegions < minLoad; - } - - /** - * Retrieves and lazily initializes a field storing the locality of - * every region/server combination - */ - public float[][] getOrComputeRackLocalities() { - if (rackLocalities == null || regionsToMostLocalEntities == null) { - computeCachedLocalities(); - } - return rackLocalities; - } - - /** - * Lazily initializes and retrieves a mapping of region -> server for which region has - * the highest the locality - */ - public int[] getOrComputeRegionsToMostLocalEntities(LocalityType type) { - if (rackLocalities == null || regionsToMostLocalEntities == null) { - computeCachedLocalities(); - } - return regionsToMostLocalEntities[type.ordinal()]; - } - - /** - * Looks up locality from cache of localities. Will create cache if it does - * not already exist. - */ - public float getOrComputeLocality(int region, int entity, LocalityType type) { - switch (type) { - case SERVER: - return getLocalityOfRegion(region, entity); - case RACK: - return getOrComputeRackLocalities()[region][entity]; - default: - throw new IllegalArgumentException("Unsupported LocalityType: " + type); - } - } - - /** - * Returns locality weighted by region size in MB. Will create locality cache - * if it does not already exist. - */ - public double getOrComputeWeightedLocality(int region, int server, LocalityType type) { - return getRegionSizeMB(region) * getOrComputeLocality(region, server, type); - } - - /** - * Returns the size in MB from the most recent RegionLoad for region - */ - public int getRegionSizeMB(int region) { - Deque load = regionLoads[region]; - // This means regions have no actual data on disk - if (load == null) { - return 0; - } - return regionLoads[region].getLast().getStorefileSizeMB(); - } - - /** - * Computes and caches the locality for each region/rack combinations, - * as well as storing a mapping of region -> server and region -> rack such that server - * and rack have the highest locality for region - */ - private void computeCachedLocalities() { - rackLocalities = new float[numRegions][numRacks]; - regionsToMostLocalEntities = new int[LocalityType.values().length][numRegions]; - - // Compute localities and find most local server per region - for (int region = 0; region < numRegions; region++) { - int serverWithBestLocality = 0; - float bestLocalityForRegion = 0; - for (int server = 0; server < numServers; server++) { - // Aggregate per-rack locality - float locality = getLocalityOfRegion(region, server); - int rack = serverIndexToRackIndex[server]; - int numServersInRack = serversPerRack[rack].length; - rackLocalities[region][rack] += locality / numServersInRack; - - if (locality > bestLocalityForRegion) { - serverWithBestLocality = server; - bestLocalityForRegion = locality; - } - } - regionsToMostLocalEntities[LocalityType.SERVER.ordinal()][region] = serverWithBestLocality; - - // Find most local rack per region - int rackWithBestLocality = 0; - float bestRackLocalityForRegion = 0.0f; - for (int rack = 0; rack < numRacks; rack++) { - float rackLocality = rackLocalities[region][rack]; - if (rackLocality > bestRackLocalityForRegion) { - bestRackLocalityForRegion = rackLocality; - rackWithBestLocality = rack; - } - } - regionsToMostLocalEntities[LocalityType.RACK.ordinal()][region] = rackWithBestLocality; - } - - } - - /** - * Maps region index to rack index - */ - public int getRackForRegion(int region) { - return serverIndexToRackIndex[regionIndexToServerIndex[region]]; - } - - enum LocalityType { - SERVER, - RACK - } - - /** An action to move or swap a region */ - public static class Action { - public enum Type { - ASSIGN_REGION, - MOVE_REGION, - SWAP_REGIONS, - NULL, - } - - public Type type; - public Action (Type type) {this.type = type;} - /** Returns an Action which would undo this action */ - public Action undoAction() { return this; } - @Override - public String toString() { return type + ":";} - } - - public static class AssignRegionAction extends Action { - public int region; - public int server; - public AssignRegionAction(int region, int server) { - super(Type.ASSIGN_REGION); - this.region = region; - this.server = server; - } - @Override - public Action undoAction() { - // TODO implement this. This action is not being used by the StochasticLB for now - // in case it uses it, we should implement this function. - throw new NotImplementedException(HConstants.NOT_IMPLEMENTED); - } - @Override - public String toString() { - return type + ": " + region + ":" + server; - } - } - - public static class MoveRegionAction extends Action { - public int region; - public int fromServer; - public int toServer; - - public MoveRegionAction(int region, int fromServer, int toServer) { - super(Type.MOVE_REGION); - this.fromServer = fromServer; - this.region = region; - this.toServer = toServer; - } - @Override - public Action undoAction() { - return new MoveRegionAction (region, toServer, fromServer); - } - @Override - public String toString() { - return type + ": " + region + ":" + fromServer + " -> " + toServer; - } - } - - public static class SwapRegionsAction extends Action { - public int fromServer; - public int fromRegion; - public int toServer; - public int toRegion; - public SwapRegionsAction(int fromServer, int fromRegion, int toServer, int toRegion) { - super(Type.SWAP_REGIONS); - this.fromServer = fromServer; - this.fromRegion = fromRegion; - this.toServer = toServer; - this.toRegion = toRegion; - } - @Override - public Action undoAction() { - return new SwapRegionsAction (fromServer, toRegion, toServer, fromRegion); - } - @Override - public String toString() { - return type + ": " + fromRegion + ":" + fromServer + " <-> " + toRegion + ":" + toServer; - } - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NM_FIELD_NAMING_CONVENTION", - justification="Mistake. Too disruptive to change now") - public static final Action NullAction = new Action(Type.NULL); - - public void doAction(Action action) { - switch (action.type) { - case NULL: break; - case ASSIGN_REGION: - // FindBugs: Having the assert quietens FB BC_UNCONFIRMED_CAST warnings - assert action instanceof AssignRegionAction: action.getClass(); - AssignRegionAction ar = (AssignRegionAction) action; - regionsPerServer[ar.server] = addRegion(regionsPerServer[ar.server], ar.region); - regionMoved(ar.region, -1, ar.server); - break; - case MOVE_REGION: - assert action instanceof MoveRegionAction: action.getClass(); - MoveRegionAction mra = (MoveRegionAction) action; - regionsPerServer[mra.fromServer] = removeRegion(regionsPerServer[mra.fromServer], mra.region); - regionsPerServer[mra.toServer] = addRegion(regionsPerServer[mra.toServer], mra.region); - regionMoved(mra.region, mra.fromServer, mra.toServer); - break; - case SWAP_REGIONS: - assert action instanceof SwapRegionsAction: action.getClass(); - SwapRegionsAction a = (SwapRegionsAction) action; - regionsPerServer[a.fromServer] = replaceRegion(regionsPerServer[a.fromServer], a.fromRegion, a.toRegion); - regionsPerServer[a.toServer] = replaceRegion(regionsPerServer[a.toServer], a.toRegion, a.fromRegion); - regionMoved(a.fromRegion, a.fromServer, a.toServer); - regionMoved(a.toRegion, a.toServer, a.fromServer); - break; - default: - throw new RuntimeException("Uknown action:" + action.type); - } - } - - /** - * Return true if the placement of region on server would lower the availability - * of the region in question - * @return true or false - */ - boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) { - if (!serversToIndex.containsKey(serverName.getAddress())) { - return false; // safeguard against race between cluster.servers and servers from LB method args - } - int server = serversToIndex.get(serverName.getAddress()); - int region = regionsToIndex.get(regionInfo); - - // Region replicas for same region should better assign to different servers - for (int i : regionsPerServer[server]) { - RegionInfo otherRegionInfo = regions[i]; - if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) { - return true; - } - } - - int primary = regionIndexToPrimaryIndex[region]; - if (primary == -1) { - return false; - } - // there is a subset relation for server < host < rack - // check server first - if (contains(primariesOfRegionsPerServer[server], primary)) { - // check for whether there are other servers that we can place this region - for (int i = 0; i < primariesOfRegionsPerServer.length; i++) { - if (i != server && !contains(primariesOfRegionsPerServer[i], primary)) { - return true; // meaning there is a better server - } - } - return false; // there is not a better server to place this - } - - // check host - if (multiServersPerHost) { - // these arrays would only be allocated if we have more than one server per host - int host = serverIndexToHostIndex[server]; - if (contains(primariesOfRegionsPerHost[host], primary)) { - // check for whether there are other hosts that we can place this region - for (int i = 0; i < primariesOfRegionsPerHost.length; i++) { - if (i != host && !contains(primariesOfRegionsPerHost[i], primary)) { - return true; // meaning there is a better host - } - } - return false; // there is not a better host to place this - } - } - - // check rack - if (numRacks > 1) { - int rack = serverIndexToRackIndex[server]; - if (contains(primariesOfRegionsPerRack[rack], primary)) { - // check for whether there are other racks that we can place this region - for (int i = 0; i < primariesOfRegionsPerRack.length; i++) { - if (i != rack && !contains(primariesOfRegionsPerRack[i], primary)) { - return true; // meaning there is a better rack - } - } - return false; // there is not a better rack to place this - } - } - - return false; - } - - void doAssignRegion(RegionInfo regionInfo, ServerName serverName) { - if (!serversToIndex.containsKey(serverName.getAddress())) { - return; - } - int server = serversToIndex.get(serverName.getAddress()); - int region = regionsToIndex.get(regionInfo); - doAction(new AssignRegionAction(region, server)); - } - - void regionMoved(int region, int oldServer, int newServer) { - regionIndexToServerIndex[region] = newServer; - if (initialRegionIndexToServerIndex[region] == newServer) { - numMovedRegions--; //region moved back to original location - } else if (oldServer >= 0 && initialRegionIndexToServerIndex[region] == oldServer) { - numMovedRegions++; //region moved from original location - } - int tableIndex = regionIndexToTableIndex[region]; - if (oldServer >= 0) { - numRegionsPerServerPerTable[oldServer][tableIndex]--; - } - numRegionsPerServerPerTable[newServer][tableIndex]++; - - //check whether this caused maxRegionsPerTable in the new Server to be updated - if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; - } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) - == numMaxRegionsPerTable[tableIndex]) { - //recompute maxRegionsPerTable since the previous value was coming from the old server - numMaxRegionsPerTable[tableIndex] = 0; - for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { - if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; - } - } - } - - // update for servers - int primary = regionIndexToPrimaryIndex[region]; - if (oldServer >= 0) { - primariesOfRegionsPerServer[oldServer] = removeRegion( - primariesOfRegionsPerServer[oldServer], primary); - } - primariesOfRegionsPerServer[newServer] = addRegionSorted( - primariesOfRegionsPerServer[newServer], primary); - - // update for hosts - if (multiServersPerHost) { - int oldHost = oldServer >= 0 ? serverIndexToHostIndex[oldServer] : -1; - int newHost = serverIndexToHostIndex[newServer]; - if (newHost != oldHost) { - regionsPerHost[newHost] = addRegion(regionsPerHost[newHost], region); - primariesOfRegionsPerHost[newHost] = addRegionSorted(primariesOfRegionsPerHost[newHost], primary); - if (oldHost >= 0) { - regionsPerHost[oldHost] = removeRegion(regionsPerHost[oldHost], region); - primariesOfRegionsPerHost[oldHost] = removeRegion( - primariesOfRegionsPerHost[oldHost], primary); // will still be sorted - } - } - } - - // update for racks - if (numRacks > 1) { - int oldRack = oldServer >= 0 ? serverIndexToRackIndex[oldServer] : -1; - int newRack = serverIndexToRackIndex[newServer]; - if (newRack != oldRack) { - regionsPerRack[newRack] = addRegion(regionsPerRack[newRack], region); - primariesOfRegionsPerRack[newRack] = addRegionSorted(primariesOfRegionsPerRack[newRack], primary); - if (oldRack >= 0) { - regionsPerRack[oldRack] = removeRegion(regionsPerRack[oldRack], region); - primariesOfRegionsPerRack[oldRack] = removeRegion( - primariesOfRegionsPerRack[oldRack], primary); // will still be sorted - } - } - } - } - - int[] removeRegion(int[] regions, int regionIndex) { - //TODO: this maybe costly. Consider using linked lists - int[] newRegions = new int[regions.length - 1]; - int i = 0; - for (i = 0; i < regions.length; i++) { - if (regions[i] == regionIndex) { - break; - } - newRegions[i] = regions[i]; - } - System.arraycopy(regions, i+1, newRegions, i, newRegions.length - i); - return newRegions; - } - - int[] addRegion(int[] regions, int regionIndex) { - int[] newRegions = new int[regions.length + 1]; - System.arraycopy(regions, 0, newRegions, 0, regions.length); - newRegions[newRegions.length - 1] = regionIndex; - return newRegions; - } - - int[] addRegionSorted(int[] regions, int regionIndex) { - int[] newRegions = new int[regions.length + 1]; - int i = 0; - for (i = 0; i < regions.length; i++) { // find the index to insert - if (regions[i] > regionIndex) { - break; - } - } - System.arraycopy(regions, 0, newRegions, 0, i); // copy first half - System.arraycopy(regions, i, newRegions, i+1, regions.length - i); // copy second half - newRegions[i] = regionIndex; - - return newRegions; - } - - int[] replaceRegion(int[] regions, int regionIndex, int newRegionIndex) { - int i = 0; - for (i = 0; i < regions.length; i++) { - if (regions[i] == regionIndex) { - regions[i] = newRegionIndex; - break; - } - } - return regions; - } - - void sortServersByRegionCount() { - Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator); - } - - int getNumRegions(int server) { - return regionsPerServer[server].length; - } - - boolean contains(int[] arr, int val) { - return Arrays.binarySearch(arr, val) >= 0; - } - - private Comparator numRegionsComparator = Comparator.comparingInt(this::getNumRegions); - - int getLowestLocalityRegionOnServer(int serverIndex) { - if (regionFinder != null) { - float lowestLocality = 1.0f; - int lowestLocalityRegionIndex = -1; - if (regionsPerServer[serverIndex].length == 0) { - // No regions on that region server - return -1; - } - for (int j = 0; j < regionsPerServer[serverIndex].length; j++) { - int regionIndex = regionsPerServer[serverIndex][j]; - HDFSBlocksDistribution distribution = regionFinder - .getBlockDistribution(regions[regionIndex]); - float locality = distribution.getBlockLocalityIndex(servers[serverIndex].getHostname()); - // skip empty region - if (distribution.getUniqueBlocksTotalWeight() == 0) { - continue; - } - if (locality < lowestLocality) { - lowestLocality = locality; - lowestLocalityRegionIndex = j; - } - } - if (lowestLocalityRegionIndex == -1) { - return -1; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Lowest locality region is " - + regions[regionsPerServer[serverIndex][lowestLocalityRegionIndex]] - .getRegionNameAsString() + " with locality " + lowestLocality - + " and its region server contains " + regionsPerServer[serverIndex].length - + " regions"); - } - return regionsPerServer[serverIndex][lowestLocalityRegionIndex]; - } else { - return -1; - } - } - - float getLocalityOfRegion(int region, int server) { - if (regionFinder != null) { - HDFSBlocksDistribution distribution = regionFinder.getBlockDistribution(regions[region]); - return distribution.getBlockLocalityIndex(servers[server].getHostname()); - } else { - return 0f; - } - } - - protected void setNumRegions(int numRegions) { - this.numRegions = numRegions; - } - - protected void setNumMovedRegions(int numMovedRegions) { - this.numMovedRegions = numMovedRegions; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SBSC_USE_STRINGBUFFER_CONCATENATION", - justification="Not important but should be fixed") - @Override - public String toString() { - StringBuilder desc = new StringBuilder("Cluster={servers=["); - for(ServerName sn:servers) { - desc.append(sn.getAddress().toString()).append(", "); - } - desc.append("], serverIndicesSortedByRegionCount=") - .append(Arrays.toString(serverIndicesSortedByRegionCount)) - .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); - - desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) - .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) - .append(", numTables=").append(numTables).append(", numMovedRegions=") - .append(numMovedRegions).append('}'); - return desc.toString(); - } - } - // slop for regions protected float slop; // overallSlop to control simpleLoadBalancer's cluster level threshold protected float overallSlop; protected Configuration config = HBaseConfiguration.create(); protected RackManager rackManager; - private static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); + static final Logger LOG = LoggerFactory.getLogger(BaseLoadBalancer.class); protected MetricsBalancer metricsBalancer = null; protected ClusterMetrics clusterStatus = null; protected ServerName masterServerName; @@ -1102,7 +186,7 @@ public void setRackManager(RackManager rackManager) { this.rackManager = rackManager; } - protected boolean needsBalance(TableName tableName, Cluster c) { + protected boolean needsBalance(TableName tableName, BalancerClusterState c) { ClusterLoadState cs = new ClusterLoadState(c.clusterState); if (cs.getNumServers() < MIN_SERVER_BALANCE) { if (LOG.isDebugEnabled()) { @@ -1143,11 +227,11 @@ protected boolean needsBalance(TableName tableName, Cluster c) { * @param c Cluster information * @return whether region replicas are currently co-located */ - protected boolean areSomeRegionReplicasColocated(Cluster c) { + protected boolean areSomeRegionReplicasColocated(BalancerClusterState c) { return false; } - protected final boolean idleRegionServerExist(Cluster c){ + protected final boolean idleRegionServerExist(BalancerClusterState c){ boolean isServerExistsWithMoreRegions = false; boolean isServerExistsWithZeroRegions = false; for (int[] serverList: c.regionsPerServer){ @@ -1198,14 +282,14 @@ public Map> roundRobinAssignment(List r return Collections.singletonMap(servers.get(0), new ArrayList<>(regions)); } - Cluster cluster = createCluster(servers, regions); + BalancerClusterState cluster = createCluster(servers, regions); Map> assignments = new HashMap<>(); roundRobinAssignment(cluster, regions, servers, assignments); return assignments; } - protected Cluster createCluster(List servers, Collection regions) - throws HBaseIOException { + protected BalancerClusterState createCluster(List servers, + Collection regions) throws HBaseIOException { boolean hasRegionReplica = false; try { if (services != null && services.getTableDescriptors() != null) { @@ -1239,7 +323,7 @@ protected Cluster createCluster(List servers, Collection clusterState.put(server, EMPTY_REGION_LIST); } } - return new Cluster(regions, clusterState, null, this.regionFinder, + return new BalancerClusterState(regions, clusterState, null, this.regionFinder, rackManager); } @@ -1270,7 +354,7 @@ public ServerName randomAssignment(RegionInfo regionInfo, List serve final List finalServers = idleServers.isEmpty() ? servers : idleServers; List regions = Lists.newArrayList(regionInfo); - Cluster cluster = createCluster(finalServers, regions); + BalancerClusterState cluster = createCluster(finalServers, regions); return randomAssignment(cluster, regionInfo, finalServers); } @@ -1373,7 +457,7 @@ public Map> retainAssignment(Map 0) { - Cluster cluster = createCluster(servers, regions.keySet()); + BalancerClusterState cluster = createCluster(servers, regions.keySet()); for (Map.Entry> entry : assignments.entrySet()) { ServerName sn = entry.getKey(); for (RegionInfo region : entry.getValue()) { @@ -1434,7 +518,7 @@ public void updateBalancerStatus(boolean status) { /** * Used to assign a single region to a random server. */ - private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, + private ServerName randomAssignment(BalancerClusterState cluster, RegionInfo regionInfo, List servers) { int numServers = servers.size(); // servers is not null, numServers > 1 ServerName sn = null; @@ -1471,7 +555,7 @@ private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo, /** * Round robin a list of regions to a list of servers */ - private void roundRobinAssignment(Cluster cluster, List regions, + private void roundRobinAssignment(BalancerClusterState cluster, List regions, List servers, Map> assignments) { Random rand = ThreadLocalRandom.current(); List unassignedRegions = new ArrayList<>(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java index ee610a9431b0..06b5623d1b5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CandidateGenerator.java @@ -28,7 +28,7 @@ @InterfaceAudience.Private abstract class CandidateGenerator { - abstract BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster); + abstract BalanceAction generate(BalancerClusterState cluster); /** * From a list of regions pick a random one. Null can be returned which @@ -42,7 +42,7 @@ abstract class CandidateGenerator { * @return a random {@link RegionInfo} or null if an asymmetrical move is * suggested. */ - int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server, + int pickRandomRegion(BalancerClusterState cluster, int server, double chanceOfNoSwap) { // Check to see if this is just a move. if (cluster.regionsPerServer[server].length == 0 @@ -54,7 +54,7 @@ int pickRandomRegion(BaseLoadBalancer.Cluster cluster, int server, return cluster.regionsPerServer[server][rand]; } - int pickRandomServer(BaseLoadBalancer.Cluster cluster) { + int pickRandomServer(BalancerClusterState cluster) { if (cluster.numServers < 1) { return -1; } @@ -62,7 +62,7 @@ int pickRandomServer(BaseLoadBalancer.Cluster cluster) { return StochasticLoadBalancer.RANDOM.nextInt(cluster.numServers); } - int pickRandomRack(BaseLoadBalancer.Cluster cluster) { + int pickRandomRack(BalancerClusterState cluster) { if (cluster.numRacks < 1) { return -1; } @@ -70,7 +70,7 @@ int pickRandomRack(BaseLoadBalancer.Cluster cluster) { return StochasticLoadBalancer.RANDOM.nextInt(cluster.numRacks); } - int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) { + int pickOtherRandomServer(BalancerClusterState cluster, int serverIndex) { if (cluster.numServers < 2) { return -1; } @@ -82,7 +82,7 @@ int pickOtherRandomServer(BaseLoadBalancer.Cluster cluster, int serverIndex) { } } - int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) { + int pickOtherRandomRack(BalancerClusterState cluster, int rackIndex) { if (cluster.numRacks < 2) { return -1; } @@ -94,10 +94,10 @@ int pickOtherRandomRack(BaseLoadBalancer.Cluster cluster, int rackIndex) { } } - BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster, + BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { if (thisServer < 0 || otherServer < 0) { - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } // Decide who is most likely to need another region @@ -114,20 +114,20 @@ BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster clust return getAction(thisServer, thisRegion, otherServer, otherRegion); } - protected BaseLoadBalancer.Cluster.Action getAction(int fromServer, int fromRegion, + protected BalanceAction getAction(int fromServer, int fromRegion, int toServer, int toRegion) { if (fromServer < 0 || toServer < 0) { - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } if (fromRegion >= 0 && toRegion >= 0) { - return new BaseLoadBalancer.Cluster.SwapRegionsAction(fromServer, fromRegion, + return new SwapRegionsAction(fromServer, fromRegion, toServer, toRegion); } else if (fromRegion >= 0) { - return new BaseLoadBalancer.Cluster.MoveRegionAction(fromRegion, fromServer, toServer); + return new MoveRegionAction(fromRegion, fromServer, toServer); } else if (toRegion >= 0) { - return new BaseLoadBalancer.Cluster.MoveRegionAction(toRegion, toServer, fromServer); + return new MoveRegionAction(toRegion, toServer, fromServer); } else { - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java index 493675c96287..feed66c17139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/FavoredStochasticBalancer.java @@ -525,13 +525,13 @@ public void generateFavoredNodesForMergedRegion(RegionInfo merged, RegionInfo [] private class FavoredNodeLocalityPicker extends CandidateGenerator { @Override - protected Cluster.Action generate(Cluster cluster) { + protected BalanceAction generate(BalancerClusterState cluster) { int thisServer = pickRandomServer(cluster); int thisRegion; if (thisServer == -1) { LOG.trace("Could not pick lowest local region server"); - return Cluster.NullAction; + return BalanceAction.NULL_ACTION; } else { // Pick lowest local region on this server thisRegion = pickLowestLocalRegionOnServer(cluster, thisServer); @@ -541,7 +541,7 @@ protected Cluster.Action generate(Cluster cluster) { LOG.trace("Could not pick lowest local region even when region server held " + cluster.regionsPerServer[thisServer].length + " regions"); } - return Cluster.NullAction; + return BalanceAction.NULL_ACTION; } RegionInfo hri = cluster.regions[thisRegion]; @@ -553,7 +553,7 @@ protected Cluster.Action generate(Cluster cluster) { } else { // No FN, ignore LOG.trace("Ignoring, no favored nodes for region: " + hri); - return Cluster.NullAction; + return BalanceAction.NULL_ACTION; } } else { // Pick other favored node with the highest locality @@ -562,7 +562,7 @@ protected Cluster.Action generate(Cluster cluster) { return getAction(thisServer, thisRegion, otherServer, -1); } - private int getDifferentFavoredNode(Cluster cluster, List favoredNodes, + private int getDifferentFavoredNode(BalancerClusterState cluster, List favoredNodes, int currentServer) { List fnIndex = new ArrayList<>(); for (ServerName sn : favoredNodes) { @@ -584,7 +584,7 @@ private int getDifferentFavoredNode(Cluster cluster, List favoredNod return highestLocalRSIndex; } - private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { + private int pickLowestLocalRegionOnServer(BalancerClusterState cluster, int server) { return cluster.getLowestLocalityRegionOnServer(server); } } @@ -596,7 +596,7 @@ private int pickLowestLocalRegionOnServer(Cluster cluster, int server) { class FavoredNodeLoadPicker extends CandidateGenerator { @Override - Cluster.Action generate(Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { cluster.sortServersByRegionCount(); int thisServer = pickMostLoadedServer(cluster); int thisRegion = pickRandomRegion(cluster, thisServer, 0); @@ -607,7 +607,7 @@ Cluster.Action generate(Cluster cluster) { if (!FavoredNodesManager.isFavoredNodeApplicable(hri)) { otherServer = pickLeastLoadedServer(cluster, thisServer); } else { - return Cluster.NullAction; + return BalanceAction.NULL_ACTION; } } else { otherServer = pickLeastLoadedFNServer(cluster, favoredNodes, thisServer); @@ -615,7 +615,7 @@ Cluster.Action generate(Cluster cluster) { return getAction(thisServer, thisRegion, otherServer, -1); } - private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { + private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; int index; for (index = 0; index < servers.length ; index++) { @@ -626,8 +626,8 @@ private int pickLeastLoadedServer(final Cluster cluster, int thisServer) { return servers[index]; } - private int pickLeastLoadedFNServer(final Cluster cluster, List favoredNodes, - int currentServerIndex) { + private int pickLeastLoadedFNServer(final BalancerClusterState cluster, + List favoredNodes, int currentServerIndex) { List fnIndex = new ArrayList<>(); for (ServerName sn : favoredNodes) { if (cluster.serversToIndex.containsKey(sn.getAddress())) { @@ -648,7 +648,7 @@ private int pickLeastLoadedFNServer(final Cluster cluster, List favo return leastLoadedFN; } - private int pickMostLoadedServer(final Cluster cluster) { + private int pickMostLoadedServer(final BalancerClusterState cluster) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; int index; for (index = servers.length - 1; index > 0 ; index--) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/HeterogeneousRegionCountCostFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/HeterogeneousRegionCountCostFunction.java index 4b58b5d0d126..72d8db9c7d47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/HeterogeneousRegionCountCostFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/HeterogeneousRegionCountCostFunction.java @@ -120,7 +120,7 @@ public class HeterogeneousRegionCountCostFunction extends StochasticLoadBalancer * any costly calculation. */ @Override - void init(final BaseLoadBalancer.Cluster cluster) { + void init(final BalancerClusterState cluster) { this.cluster = cluster; this.loadRules(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java index d60065feeb03..595e1857e251 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java @@ -24,14 +24,14 @@ class LoadCandidateGenerator extends CandidateGenerator { @Override - BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { cluster.sortServersByRegionCount(); int thisServer = pickMostLoadedServer(cluster, -1); int otherServer = pickLeastLoadedServer(cluster, thisServer); return pickRandomRegions(cluster, thisServer, otherServer); } - private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { + private int pickLeastLoadedServer(final BalancerClusterState cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; int index = 0; @@ -44,7 +44,7 @@ private int pickLeastLoadedServer(final BaseLoadBalancer.Cluster cluster, int th return servers[index]; } - private int pickMostLoadedServer(final BaseLoadBalancer.Cluster cluster, int thisServer) { + private int pickMostLoadedServer(final BalancerClusterState cluster, int thisServer) { Integer[] servers = cluster.serverIndicesSortedByRegionCount; int index = servers.length - 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java index 70b4f73984ad..9da884f40d0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/LocalityBasedCandidateGenerator.java @@ -26,7 +26,7 @@ class LocalityBasedCandidateGenerator extends CandidateGenerator { @Override - BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { // iterate through regions until you find one that is not on ideal host // start from a random point to avoid always balance the regions in front if (cluster.numRegions > 0) { @@ -35,20 +35,20 @@ BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { int region = (startIndex + i) % cluster.numRegions; int currentServer = cluster.regionIndexToServerIndex[region]; if (currentServer != cluster.getOrComputeRegionsToMostLocalEntities( - BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]) { - Optional potential = tryMoveOrSwap(cluster, + BalancerClusterState.LocalityType.SERVER)[region]) { + Optional potential = tryMoveOrSwap(cluster, currentServer, region, cluster.getOrComputeRegionsToMostLocalEntities( - BaseLoadBalancer.Cluster.LocalityType.SERVER)[region]); + BalancerClusterState.LocalityType.SERVER)[region]); if (potential.isPresent()) { return potential.get(); } } } } - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } - private Optional tryMoveOrSwap(BaseLoadBalancer.Cluster cluster, + private Optional tryMoveOrSwap(BalancerClusterState cluster, int fromServer, int fromRegion, int toServer) { // Try move first. We know apriori fromRegion has the highest locality on toServer if (cluster.serverHasTooFewRegions(toServer)) { @@ -74,8 +74,8 @@ private Optional tryMoveOrSwap(BaseLoadBalancer return Optional.empty(); } - private double getWeightedLocality(BaseLoadBalancer.Cluster cluster, int region, int server) { + private double getWeightedLocality(BalancerClusterState cluster, int region, int server) { return cluster.getOrComputeWeightedLocality(region, server, - BaseLoadBalancer.Cluster.LocalityType.SERVER); + BalancerClusterState.LocalityType.SERVER); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java index 0a878fdf600b..4badd4d2902d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/RegionReplicaCandidateGenerator.java @@ -83,10 +83,10 @@ int selectCoHostedRegionPerGroup(int[] primariesOfRegionsPerGroup, int[] regions } @Override - BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { int serverIndex = pickRandomServer(cluster); if (cluster.numServers <= 1 || serverIndex == -1) { - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } int regionIndex = selectCoHostedRegionPerGroup( diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index 6ec60249de1e..8a9f019a0148 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -256,7 +256,8 @@ public List balanceTable(TableName tableName, // construct a Cluster object with clusterMap and rest of the // argument as defaults - Cluster c = new Cluster(loadOfOneTable, null, this.regionFinder, this.rackManager); + BalancerClusterState c = + new BalancerClusterState(loadOfOneTable, null, this.regionFinder, this.rackManager); if (!this.needsBalance(tableName, c) && !this.overallNeedsBalance()) { return null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 63795be2be0a..430c2dc26d8b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -39,12 +39,7 @@ import org.apache.hadoop.hbase.client.BalancerDecision; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.AssignRegionAction; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.LocalityType; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction; +import org.apache.hadoop.hbase.master.balancer.BalancerClusterState.LocalityType; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; @@ -309,7 +304,7 @@ public void updateMetricsSize(int size) { } @Override - protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { + protected synchronized boolean areSomeRegionReplicasColocated(BalancerClusterState c) { regionReplicaHostCostFunction.init(c); if (regionReplicaHostCostFunction.cost() > 0) { return true; @@ -322,7 +317,7 @@ protected synchronized boolean areSomeRegionReplicasColocated(Cluster c) { } @Override - protected boolean needsBalance(TableName tableName, Cluster cluster) { + protected boolean needsBalance(TableName tableName, BalancerClusterState cluster) { ClusterLoadState cs = new ClusterLoadState(cluster.clusterState); if (cs.getNumServers() < MIN_SERVER_BALANCE) { if (LOG.isDebugEnabled()) { @@ -369,7 +364,7 @@ protected boolean needsBalance(TableName tableName, Cluster cluster) { return !balanced; } - Cluster.Action nextAction(Cluster cluster) { + BalanceAction nextAction(BalancerClusterState cluster) { return candidateGenerators.get(RANDOM.nextInt(candidateGenerators.size())) .generate(cluster); } @@ -394,7 +389,8 @@ public synchronized List balanceTable(TableName tableName, Map balanceTable(TableName tableName, Map balanceTable(TableName tableName, Map createRegionPlans(Cluster cluster) { + private List createRegionPlans(BalancerClusterState cluster) { List plans = new LinkedList<>(); for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) { @@ -627,13 +623,13 @@ private synchronized void updateRegionLoad() { } } - protected void initCosts(Cluster cluster) { + protected void initCosts(BalancerClusterState cluster) { for (CostFunction c:costFunctions) { c.init(cluster); } } - protected void updateCostsWithAction(Cluster cluster, Action action) { + protected void updateCostsWithAction(BalancerClusterState cluster, BalanceAction action) { for (CostFunction c : costFunctions) { c.postAction(action); } @@ -662,7 +658,7 @@ public String[] getCostFunctionNames() { * @return a double of a cost associated with the proposed cluster state. This cost is an * aggregate of all individual cost functions. */ - protected double computeCost(Cluster cluster, double previousCost) { + protected double computeCost(BalancerClusterState cluster, double previousCost) { double total = 0; for (int i = 0; i < costFunctions.size(); i++) { @@ -690,7 +686,7 @@ protected double computeCost(Cluster cluster, double previousCost) { static class RandomCandidateGenerator extends CandidateGenerator { @Override - Cluster.Action generate(Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { int thisServer = pickRandomServer(cluster); @@ -707,7 +703,7 @@ Cluster.Action generate(Cluster cluster) { */ static class RegionReplicaRackCandidateGenerator extends RegionReplicaCandidateGenerator { @Override - Cluster.Action generate(Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { int rackIndex = pickRandomRack(cluster); if (cluster.numRacks <= 1 || rackIndex == -1) { return super.generate(cluster); @@ -741,7 +737,7 @@ public abstract static class CostFunction { private float multiplier = 0; - protected Cluster cluster; + protected BalancerClusterState cluster; public CostFunction(Configuration c) { } @@ -760,7 +756,7 @@ void setMultiplier(float m) { /** Called once per LB invocation to give the cost function * to initialize it's state, and perform any costly calculation. */ - void init(Cluster cluster) { + void init(BalancerClusterState cluster) { this.cluster = cluster; } @@ -768,24 +764,24 @@ void init(Cluster cluster) { * an opportunity to update it's state. postAction() is always * called at least once before cost() is called with the cluster * that this action is performed on. */ - void postAction(Action action) { - switch (action.type) { + void postAction(BalanceAction action) { + switch (action.getType()) { case NULL: break; case ASSIGN_REGION: AssignRegionAction ar = (AssignRegionAction) action; - regionMoved(ar.region, -1, ar.server); + regionMoved(ar.getRegion(), -1, ar.getServer()); break; case MOVE_REGION: MoveRegionAction mra = (MoveRegionAction) action; - regionMoved(mra.region, mra.fromServer, mra.toServer); + regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); break; case SWAP_REGIONS: SwapRegionsAction a = (SwapRegionsAction) action; - regionMoved(a.fromRegion, a.fromServer, a.toServer); - regionMoved(a.toRegion, a.toServer, a.fromServer); + regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); + regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); break; default: - throw new RuntimeException("Uknown action:" + action.type); + throw new RuntimeException("Uknown action:" + action.getType()); } } @@ -936,7 +932,7 @@ static class RegionCountSkewCostFunction extends CostFunction { } @Override - void init(Cluster cluster) { + void init(BalancerClusterState cluster) { super.init(cluster); LOG.debug("{} sees a total of {} servers and {} regions.", getClass().getSimpleName(), cluster.numServers, cluster.numRegions); @@ -1060,7 +1056,7 @@ static abstract class LocalityBasedCostFunction extends CostFunction { abstract int regionIndexToEntityIndex(int region); @Override - void init(Cluster cluster) { + void init(BalancerClusterState cluster) { super.init(cluster); locality = 0.0; bestLocality = 0.0; @@ -1310,7 +1306,7 @@ public RegionReplicaHostCostFunction(Configuration conf) { } @Override - void init(Cluster cluster) { + void init(BalancerClusterState cluster) { super.init(cluster); // max cost is the case where every region replica is hosted together regardless of host maxCost = cluster.numHosts > 1 ? getMaxCost(cluster) : 0; @@ -1323,7 +1319,7 @@ void init(Cluster cluster) { } } - long getMaxCost(Cluster cluster) { + long getMaxCost(BalancerClusterState cluster) { if (!cluster.hasRegionReplicas) { return 0; // short circuit } @@ -1422,7 +1418,7 @@ public RegionReplicaRackCostFunction(Configuration conf) { } @Override - void init(Cluster cluster) { + void init(BalancerClusterState cluster) { this.cluster = cluster; if (cluster.numRacks <= 1) { maxCost = 0; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java index 685088fed2c9..8585b5c069a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java @@ -376,8 +376,8 @@ protected TreeMap> mockClusterServers(int[] mockClu return mockClusterServers(mockCluster, -1); } - protected BaseLoadBalancer.Cluster mockCluster(int[] mockCluster) { - return new BaseLoadBalancer.Cluster( + protected BalancerClusterState mockCluster(int[] mockCluster) { + return new BalancerClusterState( mockClusterServers(mockCluster, -1), null, null, null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index b6446fd6df10..c31cdaf448ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -47,8 +47,6 @@ import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegionAction; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -301,7 +299,7 @@ public void testRegionAvailability() throws Exception { // cluster is created (constructor code) would make sure the indices of // the servers are in the order in which it is inserted in the clusterState // map (linkedhashmap is important). A similar thing applies to the region lists - Cluster cluster = new Cluster(clusterState, null, null, rackManager); + BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager); // check whether a move of region1 from servers[0] to servers[1] would lower // the availability of region1 assertTrue(cluster.wouldLowerAvailability(hri1, servers[1])); @@ -318,7 +316,7 @@ public void testRegionAvailability() throws Exception { // now lets have servers[1] host replica_of_region2 list1.add(RegionReplicaUtil.getRegionInfoForReplica(hri3, 1)); // create a new clusterState with the above change - cluster = new Cluster(clusterState, null, null, rackManager); + cluster = new BalancerClusterState(clusterState, null, null, rackManager); // now check whether a move of a replica from servers[0] to servers[1] would lower // the availability of region2 assertTrue(cluster.wouldLowerAvailability(hri3, servers[1])); @@ -330,14 +328,14 @@ public void testRegionAvailability() throws Exception { clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 clusterState.put(servers[10], new ArrayList<>()); //servers[10], rack3 hosts no region // create a cluster with the above clusterState - cluster = new Cluster(clusterState, null, null, rackManager); + cluster = new BalancerClusterState(clusterState, null, null, rackManager); // check whether a move of region1 from servers[0],rack1 to servers[6],rack2 would // lower the availability assertTrue(cluster.wouldLowerAvailability(hri1, servers[0])); // now create a cluster without the rack manager - cluster = new Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); // now repeat check whether a move of region1 from servers[0] to servers[6] would // lower the availability assertTrue(!cluster.wouldLowerAvailability(hri1, servers[6])); @@ -375,7 +373,7 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception { // cluster is created (constructor code) would make sure the indices of // the servers are in the order in which it is inserted in the clusterState // map (linkedhashmap is important). - Cluster cluster = new Cluster(clusterState, null, null, rackManager); + BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, rackManager); // check whether moving region1 from servers[1] to servers[2] would lower availability assertTrue(!cluster.wouldLowerAvailability(hri1, servers[2])); @@ -397,7 +395,7 @@ public void testRegionAvailabilityWithRegionMoves() throws Exception { clusterState.put(servers[6], list2); //servers[6], rack2 hosts region2 clusterState.put(servers[12], list3); //servers[12], rack3 hosts replica_of_region2 // create a cluster with the above clusterState - cluster = new Cluster(clusterState, null, null, rackManager); + cluster = new BalancerClusterState(clusterState, null, null, rackManager); // check whether a move of replica_of_region2 from servers[12],rack3 to servers[0],rack1 would // lower the availability assertTrue(!cluster.wouldLowerAvailability(hri4, servers[0])); @@ -479,7 +477,7 @@ public void testClusterServersWithSameHostPort() { assignRegions(regions, oldServers, clusterState); // should not throw exception: - BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, null, null); + BalancerClusterState cluster = new BalancerClusterState(clusterState, null, null, null); assertEquals(101 + 9, cluster.numRegions); assertEquals(10, cluster.numServers); // only 10 servers because they share the same host + port @@ -525,12 +523,15 @@ public void testClusterRegionLocations() { Lists.newArrayList(servers.get(0), servers.get(1))); when(locationFinder.getTopBlockLocations(regions.get(42))).thenReturn( Lists.newArrayList(servers.get(4), servers.get(9), servers.get(5))); - when(locationFinder.getTopBlockLocations(regions.get(43))).thenReturn( - Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); // this server does not exists in clusterStatus + // this server does not exists in clusterStatus + when(locationFinder.getTopBlockLocations(regions.get(43))) + .thenReturn(Lists.newArrayList(ServerName.valueOf("foo", 0, 0))); - BaseLoadBalancer.Cluster cluster = new Cluster(clusterState, null, locationFinder, null); + BalancerClusterState cluster = + new BalancerClusterState(clusterState, null, locationFinder, null); - int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); // this is ok, it is just a test + // this is ok, it is just a test + int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10)); int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java index b750344fa5b1..7c695219b064 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestFavoredStochasticBalancerPickers.java @@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RackManager; import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; @@ -184,9 +183,10 @@ public boolean evaluate() throws Exception { regionFinder.setConf(conf); regionFinder.setClusterInfoProvider( new MasterClusterInfoProvider(TEST_UTIL.getMiniHBaseCluster().getMaster())); - Cluster cluster = new Cluster(serverAssignments, null, regionFinder, new RackManager(conf)); + BalancerClusterState cluster = + new BalancerClusterState(serverAssignments, null, regionFinder, new RackManager(conf)); LoadOnlyFavoredStochasticBalancer balancer = (LoadOnlyFavoredStochasticBalancer) TEST_UTIL - .getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer(); + .getMiniHBaseCluster().getMaster().getLoadBalancer().getInternalBalancer(); cluster.sortServersByRegionCount(); Integer[] servers = cluster.serverIndicesSortedByRegionCount; @@ -204,13 +204,13 @@ public boolean evaluate() throws Exception { if (userRegionPicked) { break; } else { - Cluster.Action action = loadPicker.generate(cluster); - if (action.type == Cluster.Action.Type.MOVE_REGION) { - Cluster.MoveRegionAction moveRegionAction = (Cluster.MoveRegionAction) action; - RegionInfo region = cluster.regions[moveRegionAction.region]; - assertNotEquals(-1, moveRegionAction.toServer); - ServerName destinationServer = cluster.servers[moveRegionAction.toServer]; - assertEquals(cluster.servers[moveRegionAction.fromServer], mostLoadedServer); + BalanceAction action = loadPicker.generate(cluster); + if (action.getType() == BalanceAction.Type.MOVE_REGION) { + MoveRegionAction moveRegionAction = (MoveRegionAction) action; + RegionInfo region = cluster.regions[moveRegionAction.getRegion()]; + assertNotEquals(-1, moveRegionAction.getToServer()); + ServerName destinationServer = cluster.servers[moveRegionAction.getToServer()]; + assertEquals(cluster.servers[moveRegionAction.getFromServer()], mostLoadedServer); if (!region.getTable().isSystemTable()) { List favNodes = fnm.getFavoredNodes(region); assertTrue(favNodes.contains( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java index e702c5fd07a5..885c17686d65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java @@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MockNoopMasterServices; import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.ServerLocalityCostFunction; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -295,7 +294,7 @@ public void testMoveCostMultiplier() throws Exception { Configuration conf = HBaseConfiguration.create(); StochasticLoadBalancer.CostFunction costFunction = new StochasticLoadBalancer.MoveCostFunction(conf); - BaseLoadBalancer.Cluster cluster = mockCluster(clusterStateMocks[0]); + BalancerClusterState cluster = mockCluster(clusterStateMocks[0]); costFunction.init(cluster); costFunction.cost(); assertEquals(StochasticLoadBalancer.MoveCostFunction.DEFAULT_MOVE_COST, @@ -322,7 +321,7 @@ public void testMoveCost() throws Exception { StochasticLoadBalancer.CostFunction costFunction = new StochasticLoadBalancer.MoveCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { - BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + BalancerClusterState cluster = mockCluster(mockCluster); costFunction.init(cluster); double cost = costFunction.cost(); assertEquals(0.0f, cost, 0.001); @@ -385,14 +384,14 @@ public void testCostAfterUndoAction() { final int runs = 10; loadBalancer.setConf(conf); for (int[] mockCluster : clusterStateMocks) { - BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + BalancerClusterState cluster = mockCluster(mockCluster); loadBalancer.initCosts(cluster); for (int i = 0; i != runs; ++i) { final double expectedCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); - Cluster.Action action = loadBalancer.nextAction(cluster); + BalanceAction action = loadBalancer.nextAction(cluster); cluster.doAction(action); loadBalancer.updateCostsWithAction(cluster, action); - Cluster.Action undoAction = action.undoAction(); + BalanceAction undoAction = action.undoAction(); cluster.doAction(undoAction); loadBalancer.updateCostsWithAction(cluster, undoAction); final double actualCost = loadBalancer.computeCost(cluster, Double.MAX_VALUE); @@ -407,7 +406,7 @@ public void testTableSkewCost() { StochasticLoadBalancer.CostFunction costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { - BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + BalancerClusterState cluster = mockCluster(mockCluster); costFunction.init(cluster); double cost = costFunction.cost(); assertTrue(cost >= 0); @@ -548,7 +547,7 @@ private boolean needsBalanceIdleRegion(int[] cluster){ } // This mock allows us to test the LocalityCostFunction - private class MockCluster extends BaseLoadBalancer.Cluster { + private class MockCluster extends BalancerClusterState { private int[][] localities = null; // [region][server] = percent of blocks diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java index 066e22a9246e..8a2ec848a751 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerHeterogeneousCost.java @@ -197,8 +197,8 @@ protected void testWithCluster(final Map> serverMap final HeterogeneousRegionCountCostFunction cf = new HeterogeneousRegionCountCostFunction(conf); assertNotNull(cf); - BaseLoadBalancer.Cluster cluster = - new BaseLoadBalancer.Cluster(serverMap, null, null, null); + BalancerClusterState cluster = + new BalancerClusterState(serverMap, null, null, null); cf.init(cluster); // checking that we all hosts have a number of regions below their limit @@ -285,10 +285,10 @@ static class FairRandomCandidateGenerator extends StochasticLoadBalancer.RandomCandidateGenerator { @Override - public BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluster cluster, + public BalanceAction pickRandomRegions(BalancerClusterState cluster, int thisServer, int otherServer) { if (thisServer < 0 || otherServer < 0) { - return BaseLoadBalancer.Cluster.NullAction; + return BalanceAction.NULL_ACTION; } int thisRegion = pickRandomRegion(cluster, thisServer, 0.5); @@ -298,7 +298,7 @@ public BaseLoadBalancer.Cluster.Action pickRandomRegions(BaseLoadBalancer.Cluste } @Override - BaseLoadBalancer.Cluster.Action generate(BaseLoadBalancer.Cluster cluster) { + BalanceAction generate(BalancerClusterState cluster) { return super.generate(cluster); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java index 9634f9a5eb77..c922699754e0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplica.java @@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.master.RackManager; -import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.junit.ClassRule; @@ -55,7 +54,7 @@ public void testReplicaCost() { StochasticLoadBalancer.CostFunction costFunction = new StochasticLoadBalancer.RegionReplicaHostCostFunction(conf); for (int[] mockCluster : clusterStateMocks) { - BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster); + BalancerClusterState cluster = mockCluster(mockCluster); costFunction.init(cluster); double cost = costFunction.cost(); assertTrue(cost >= 0); @@ -72,9 +71,9 @@ public void testReplicaCostForReplicas() { int[] servers = new int[] { 3, 3, 3, 3, 3 }; TreeMap> clusterState = mockClusterServers(servers); - BaseLoadBalancer.Cluster cluster; + BalancerClusterState cluster; - cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); costFunction.init(cluster); double costWithoutReplicas = costFunction.cost(); assertEquals(0, costWithoutReplicas, 0); @@ -84,7 +83,7 @@ public void testReplicaCostForReplicas() { RegionReplicaUtil.getRegionInfoForReplica(clusterState.firstEntry().getValue().get(0), 1); clusterState.lastEntry().getValue().add(replica1); - cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); costFunction.init(cluster); double costWith1ReplicaDifferentServer = costFunction.cost(); @@ -94,7 +93,7 @@ public void testReplicaCostForReplicas() { RegionInfo replica2 = RegionReplicaUtil.getRegionInfoForReplica(replica1, 2); clusterState.lastEntry().getValue().add(replica2); - cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); costFunction.init(cluster); double costWith1ReplicaSameServer = costFunction.cost(); @@ -117,7 +116,7 @@ public void testReplicaCostForReplicas() { entry.getValue().add(replica2); it.next().getValue().add(replica3); // 2nd server - cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); costFunction.init(cluster); double costWith3ReplicasSameServer = costFunction.cost(); @@ -131,7 +130,7 @@ public void testReplicaCostForReplicas() { clusterState.lastEntry().getValue().add(replica2); clusterState.lastEntry().getValue().add(replica3); - cluster = new BaseLoadBalancer.Cluster(clusterState, null, null, null); + cluster = new BalancerClusterState(clusterState, null, null, null); costFunction.init(cluster); double costWith2ReplicasOnTwoServers = costFunction.cost(); @@ -152,7 +151,7 @@ public void testNeedsBalanceForColocatedReplicas() { regions = randomRegions(1); map.put(s2, regions); assertTrue(loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, - new Cluster(map, null, null, null))); + new BalancerClusterState(map, null, null, null))); // check for the case where there are two hosts on the same rack and there are two racks // and both the replicas are on the same rack map.clear(); @@ -165,7 +164,7 @@ public void testNeedsBalanceForColocatedReplicas() { map.put(ServerName.valueOf("host2", 1000, 11111), randomRegions(1)); assertTrue( loadBalancer.needsBalance(HConstants.ENSEMBLE_TABLE_NAME, - new Cluster(map, null, null, new ForTestRackManagerOne()))); + new BalancerClusterState(map, null, null, new ForTestRackManagerOne()))); } @Test From fa3a9ba69d14b048b4d935db8f57de6eccf0192e Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 21 Apr 2021 11:52:00 +0800 Subject: [PATCH 2/2] fix checkstyle and error prone warnings --- .../master/balancer/BalancerClusterState.java | 25 +++++++------- .../balancer/StochasticLoadBalancer.java | 33 ++++++++++--------- .../master/balancer/TestBaseLoadBalancer.java | 2 +- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java index ed10c19c3917..e90064ebaf2d 100644 --- a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java +++ b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -214,7 +213,7 @@ public String getRack(ServerName server) { int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0; - for (Entry> entry : clusterState.entrySet()) { + for (Map.Entry> entry : clusterState.entrySet()) { if (entry.getKey() == null) { LOG.warn("SERVERNAME IS NULL, skipping " + entry.getValue()); continue; @@ -242,15 +241,15 @@ public String getRack(ServerName server) { } hosts = new String[numHosts]; - for (Entry entry : hostsToIndex.entrySet()) { + for (Map.Entry entry : hostsToIndex.entrySet()) { hosts[entry.getValue()] = entry.getKey(); } racks = new String[numRacks]; - for (Entry entry : racksToIndex.entrySet()) { + for (Map.Entry entry : racksToIndex.entrySet()) { racks[entry.getValue()] = entry.getKey(); } - for (Entry> entry : clusterState.entrySet()) { + for (Map.Entry> entry : clusterState.entrySet()) { int serverIndex = serversToIndex.get(entry.getKey().getAddress()); regionPerServerIndex = serverIndexToRegionsOffset[serverIndex]; @@ -678,16 +677,16 @@ void regionMoved(int region, int oldServer, int newServer) { // check whether this caused maxRegionsPerTable in the new Server to be updated if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; - } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + - 1) == numMaxRegionsPerTable[tableIndex]) { - // recompute maxRegionsPerTable since the previous value was coming from the old server - numMaxRegionsPerTable[tableIndex] = 0; - for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { - if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; - } + } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + + 1) == numMaxRegionsPerTable[tableIndex]) { + // recompute maxRegionsPerTable since the previous value was coming from the old server + numMaxRegionsPerTable[tableIndex] = 0; + for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { + if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { + numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; } } + } // update for servers int primary = regionIndexToPrimaryIndex[region]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 430c2dc26d8b..c0ab4d9e9a9f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -766,22 +766,23 @@ void init(BalancerClusterState cluster) { * that this action is performed on. */ void postAction(BalanceAction action) { switch (action.getType()) { - case NULL: break; - case ASSIGN_REGION: - AssignRegionAction ar = (AssignRegionAction) action; - regionMoved(ar.getRegion(), -1, ar.getServer()); - break; - case MOVE_REGION: - MoveRegionAction mra = (MoveRegionAction) action; - regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); - break; - case SWAP_REGIONS: - SwapRegionsAction a = (SwapRegionsAction) action; - regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); - regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); - break; - default: - throw new RuntimeException("Uknown action:" + action.getType()); + case NULL: + break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionMoved(ar.getRegion(), -1, ar.getServer()); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionMoved(mra.getRegion(), mra.getFromServer(), mra.getToServer()); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionMoved(a.getFromRegion(), a.getFromServer(), a.getToServer()); + regionMoved(a.getToRegion(), a.getToServer(), a.getFromServer()); + break; + default: + throw new RuntimeException("Uknown action:" + action.getType()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index c31cdaf448ae..98c0bddb1ce7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -531,7 +531,7 @@ public void testClusterRegionLocations() { new BalancerClusterState(clusterState, null, locationFinder, null); // this is ok, it is just a test - int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); + int r0 = ArrayUtils.indexOf(cluster.regions, regions.get(0)); int r1 = ArrayUtils.indexOf(cluster.regions, regions.get(1)); int r10 = ArrayUtils.indexOf(cluster.regions, regions.get(10)); int r42 = ArrayUtils.indexOf(cluster.regions, regions.get(42));