Skip to content

Commit

Permalink
HBASE-23078 BaseLoadBalancer should consider region replicas when ran…
Browse files Browse the repository at this point in the history
…domAssignment and roundRobinAssignment
  • Loading branch information
infraio committed Sep 26, 2019
1 parent f0dddd1 commit e8de3bd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.master.balancer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -1263,7 +1264,7 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

Cluster cluster = createCluster(servers, regions, false);
Cluster cluster = createCluster(servers, regions);
List<RegionInfo> unassignedRegions = new ArrayList<>();

roundRobinAssignment(cluster, regions, unassignedRegions,
Expand Down Expand Up @@ -1319,8 +1320,22 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
return assignments;
}

protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions,
boolean hasRegionReplica) {
protected Cluster createCluster(List<ServerName> servers, Collection<RegionInfo> regions)
throws HBaseIOException {
boolean hasRegionReplica = false;
try {
for (RegionInfo regionInfo : regions) {
TableName tableName = regionInfo.getTable();
if (services != null &&
services.getTableDescriptors().get(tableName).getRegionReplication() > 1) {
hasRegionReplica = true;
break;
}
}
} catch (IOException ioe) {
throw new HBaseIOException(ioe);
}

// Get the snapshot of the current assignments for the regions in question, and then create
// a cluster out of it. Note that we might have replicas already assigned to some servers
// earlier. So we want to get the snapshot to see those assignments, but this will only contain
Expand Down Expand Up @@ -1380,7 +1395,7 @@ public ServerName randomAssignment(RegionInfo regionInfo, List<ServerName> serve
final List<ServerName> finalServers = idleServers.isEmpty() ?
servers : idleServers;
List<RegionInfo> regions = Lists.newArrayList(regionInfo);
Cluster cluster = createCluster(finalServers, regions, false);
Cluster cluster = createCluster(finalServers, regions);
return randomAssignment(cluster, regionInfo, finalServers);
}

Expand Down Expand Up @@ -1452,21 +1467,9 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server

int numRandomAssignments = 0;
int numRetainedAssigments = 0;
boolean hasRegionReplica = false;
for (Map.Entry<RegionInfo, ServerName> entry : regions.entrySet()) {
RegionInfo region = entry.getKey();
ServerName oldServerName = entry.getValue();
// In the current set of regions even if one has region replica let us go with
// getting the entire snapshot
if (this.services != null) { // for tests
AssignmentManager am = this.services.getAssignmentManager();
if (am != null) {
RegionStates states = am.getRegionStates();
if (!hasRegionReplica && states != null && states.isReplicaAvailableForRegion(region)) {
hasRegionReplica = true;
}
}
}
List<ServerName> localServers = new ArrayList<>();
if (oldServerName != null) {
localServers = serversByHostname.get(oldServerName.getHostnameLowerCase());
Expand Down Expand Up @@ -1506,7 +1509,7 @@ public Map<ServerName, List<RegionInfo>> retainAssignment(Map<RegionInfo, Server

// If servers from prior assignment aren't present, then lets do randomAssignment on regions.
if (randomAssignRegions.size() > 0) {
Cluster cluster = createCluster(servers, regions.keySet(), hasRegionReplica);
Cluster cluster = createCluster(servers, regions.keySet());
for (Map.Entry<ServerName, List<RegionInfo>> entry : assignments.entrySet()) {
ServerName sn = entry.getKey();
for (RegionInfo region : entry.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand All @@ -37,7 +39,6 @@
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand All @@ -64,31 +65,27 @@ public class TestRegionReplicasWithRestartScenarios {

private static final int NB_SERVERS = 3;
private Table table;
private TableName tableName;

private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;

@BeforeClass
public static void beforeClass() throws Exception {
// Reduce the hdfs block size and prefetch to trigger the file-link reopen
// when the file is moved to archive (e.g. compaction)
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 8192);
HTU.getConfiguration().setInt(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 1);
HTU.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 128 * 1024 * 1024);
HTU.getConfiguration().setInt(">hbase.master.wait.on.regionservers.mintostart", 3);
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", NB_SERVERS);
HTU.startMiniCluster(NB_SERVERS);
}

@Before
public void before() throws IOException {
TableName tableName = TableName.valueOf(this.name.getMethodName());
// Create table then get the single region for our new table.
this.table = createTableDirectlyFromHTD(tableName);
this.tableName = TableName.valueOf(this.name.getMethodName());
this.table = createTableDirectlyFromHTD(this.tableName);
}

@After
public void after() throws IOException {
this.table.close();
HTU.deleteTable(this.tableName);
}

private static Table createTableDirectlyFromHTD(final TableName tableName) throws IOException {
Expand Down Expand Up @@ -125,6 +122,20 @@ private HRegionServer getTertiaryRS() {

@Test
public void testRegionReplicasCreated() throws Exception {
assertReplicaDistributed();
}

@Test
public void testWhenRestart() throws Exception {
ServerName serverName = getRS().getServerName();
HTU.getHBaseCluster().stopRegionServer(serverName);
HTU.getHBaseCluster().waitForRegionServerToStop(serverName, 60000);
HTU.getHBaseCluster().startRegionServerAndWait(60000);
HTU.waitTableAvailable(this.tableName);
assertReplicaDistributed();
}

private void assertReplicaDistributed() throws Exception {
Collection<HRegion> onlineRegions = getRS().getOnlineRegionsLocalContext();
boolean res = checkDuplicates(onlineRegions);
assertFalse(res);
Expand All @@ -150,7 +161,7 @@ private boolean checkDuplicates(Collection<HRegion> onlineRegions3) throws Excep
RegionReplicaUtil.getRegionInfoForDefaultReplica(actualRegion.getRegionInfo()))) {
i++;
if (i > 1) {
LOG.info("Duplicate found " + actualRegion.getRegionInfo() + " " +
LOG.warn("Duplicate found {} and {}", actualRegion.getRegionInfo(),
region.getRegionInfo());
assertTrue(Bytes.equals(region.getRegionInfo().getStartKey(),
actualRegion.getRegionInfo().getStartKey()));
Expand Down

0 comments on commit e8de3bd

Please sign in to comment.