Skip to content

Commit

Permalink
HBASE-23659 BaseLoadBalancer#wouldLowerAvailability should consider r…
Browse files Browse the repository at this point in the history
…egion replicas (apache#1001)

Signed-off-by: stack <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
infraio committed Jan 10, 2020
1 parent ce7c559 commit b01fb05
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -746,10 +746,20 @@ boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
int server = serversToIndex.get(serverName.getHostAndPort());
int region = regionsToIndex.get(regionInfo);

// Region replicas for same region should better assign to different servers
for (int i : regionsPerServer[server]) {
RegionInfo otherRegionInfo = regions[i];
if (RegionReplicaUtil.isReplicasForSameRegion(regionInfo, otherRegionInfo)) {
return true;
}
}

int primary = regionIndexToPrimaryIndex[region];
if (primary == -1) {
return false;
}
// there is a subset relation for server < host < rack
// check server first

if (contains(primariesOfRegionsPerServer[server], primary)) {
// check for whether there are other servers that we can place this region
for (int i = 0; i < primariesOfRegionsPerServer.length; i++) {
Expand All @@ -761,7 +771,8 @@ boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
}

// check host
if (multiServersPerHost) { // these arrays would only be allocated if we have more than one server per host
if (multiServersPerHost) {
// these arrays would only be allocated if we have more than one server per host
int host = serverIndexToHostIndex[server];
if (contains(primariesOfRegionsPerHost[host], primary)) {
// check for whether there are other hosts that we can place this region
Expand All @@ -787,6 +798,7 @@ boolean wouldLowerAvailability(RegionInfo regionInfo, ServerName serverName) {
return false; // there is not a better rack to place this
}
}

return false;
}

Expand Down Expand Up @@ -1267,58 +1279,7 @@ public Map<ServerName, List<RegionInfo>> roundRobinAssignment(List<RegionInfo> r
}

Cluster cluster = createCluster(servers, regions);
List<RegionInfo> unassignedRegions = new ArrayList<>();

roundRobinAssignment(cluster, regions, unassignedRegions,
servers, assignments);

List<RegionInfo> lastFewRegions = new ArrayList<>();
// assign the remaining by going through the list and try to assign to servers one-by-one
int serverIdx = RANDOM.nextInt(numServers);
OUTER : for (RegionInfo region : unassignedRegions) {
boolean assigned = false;
INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
ServerName serverName = servers.get((j + serverIdx) % numServers);
if (!cluster.wouldLowerAvailability(region, serverName)) {
List<RegionInfo> serverRegions =
assignments.computeIfAbsent(serverName, k -> new ArrayList<>());
if (!RegionReplicaUtil.isDefaultReplica(region.getReplicaId())) {
// if the region is not a default replica
// check if the assignments map has the other replica region on this server
for (RegionInfo hri : serverRegions) {
if (RegionReplicaUtil.isReplicasForSameRegion(region, hri)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping the server, " + serverName
+ " , got the same server for the region " + region);
}
// do not allow this case. The unassignedRegions we got because the
// replica region in this list was not assigned because of lower availablity issue.
// So when we assign here we should ensure that as far as possible the server being
// selected does not have the server where the replica region was not assigned.
continue INNER; // continue the inner loop, ie go to the next server
}
}
}
serverRegions.add(region);
cluster.doAssignRegion(region, serverName);
serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
assigned = true;
break;
}
}
if (!assigned) {
lastFewRegions.add(region);
}
}
// just sprinkle the rest of the regions on random regionservers. The balanceCluster will
// make it optimal later. we can end up with this if numReplicas > numServers.
for (RegionInfo region : lastFewRegions) {
int i = RANDOM.nextInt(numServers);
ServerName server = servers.get(i);
List<RegionInfo> serverRegions = assignments.computeIfAbsent(server, k -> new ArrayList<>());
serverRegions.add(region);
cluster.doAssignRegion(region, server);
}
roundRobinAssignment(cluster, regions, servers, assignments);
return assignments;
}

Expand Down Expand Up @@ -1611,9 +1572,8 @@ private ServerName randomAssignment(Cluster cluster, RegionInfo regionInfo,
* Round robin a list of regions to a list of servers
*/
private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
List<RegionInfo> unassignedRegions, List<ServerName> servers,
Map<ServerName, List<RegionInfo>> assignments) {

List<ServerName> servers, Map<ServerName, List<RegionInfo>> assignments) {
List<RegionInfo> unassignedRegions = new ArrayList<>();
int numServers = servers.size();
int numRegions = regions.size();
int max = (int) Math.ceil((float) numRegions / numServers);
Expand All @@ -1622,7 +1582,6 @@ private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
serverIdx = RANDOM.nextInt(numServers);
}
int regionIdx = 0;

for (int j = 0; j < numServers; j++) {
ServerName server = servers.get((j + serverIdx) % numServers);
List<RegionInfo> serverRegions = new ArrayList<>(max);
Expand All @@ -1638,6 +1597,37 @@ private void roundRobinAssignment(Cluster cluster, List<RegionInfo> regions,
assignments.put(server, serverRegions);
regionIdx++;
}


List<RegionInfo> lastFewRegions = new ArrayList<>();
// assign the remaining by going through the list and try to assign to servers one-by-one
serverIdx = RANDOM.nextInt(numServers);
OUTER : for (RegionInfo region : unassignedRegions) {
boolean assigned = false;
INNER : for (int j = 0; j < numServers; j++) { // try all servers one by one
ServerName server = servers.get((j + serverIdx) % numServers);
if (cluster.wouldLowerAvailability(region, server)) {
continue INNER;
} else {
assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
cluster.doAssignRegion(region, server);
serverIdx = (j + serverIdx + 1) % numServers; //remain from next server
assigned = true;
break;
}
}
if (!assigned) {
lastFewRegions.add(region);
}
}
// just sprinkle the rest of the regions on random regionservers. The balanceCluster will
// make it optimal later. we can end up with this if numReplicas > numServers.
for (RegionInfo region : lastFewRegions) {
int i = RANDOM.nextInt(numServers);
ServerName server = servers.get(i);
assignments.computeIfAbsent(server, k -> new ArrayList<>()).add(region);
cluster.doAssignRegion(region, server);
}
}

protected Map<ServerName, List<RegionInfo>> getRegionAssignmentsByServer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand All @@ -31,8 +34,10 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.JVMClusterUtil;

final class RegionReplicaTestHelper {
public final class RegionReplicaTestHelper {

private RegionReplicaTestHelper() {
}
Expand Down Expand Up @@ -156,4 +161,32 @@ static void testLocator(HBaseTestingUtility util, TableName tableName, Locator l
assertEquals(newServerName2,
locator.getRegionLocations(tableName, 2, false).getRegionLocation(2).getServerName());
}

public static void assertReplicaDistributed(HBaseTestingUtility util, Table t)
throws IOException {
if (t.getDescriptor().getRegionReplication() <= 1) {
return;
}
List<RegionInfo> regionInfos = new ArrayList<>();
for (JVMClusterUtil.RegionServerThread rs : util.getMiniHBaseCluster()
.getRegionServerThreads()) {
regionInfos.clear();
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
if (contains(regionInfos, r.getRegionInfo())) {
fail("Replica regions should be assigned to different region servers");
} else {
regionInfos.add(r.getRegionInfo());
}
}
}
}

private static boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
for (RegionInfo info : regionInfos) {
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.hadoop.hbase.master.assignment;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,10 +26,9 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
Expand Down Expand Up @@ -107,11 +103,10 @@ public void testRegionReplicaSplitRegionAssignment() throws Exception {
List<RegionInfo> regions = new ArrayList<RegionInfo>();
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (Region r : rs.getRegionServer().getRegions(table.getName())) {
System.out.println("the region before split is is " + r.getRegionInfo()
+ rs.getRegionServer().getServerName());
regions.add(r.getRegionInfo());
}
}
// There are 6 regions before split, 9 regions after split.
HTU.getAdmin().split(table.getName(), Bytes.toBytes(1));
int count = 0;
while (true) {
Expand All @@ -125,33 +120,7 @@ public void testRegionReplicaSplitRegionAssignment() throws Exception {
}
count = 0;
}
List<ServerName> newRegionLocations = new ArrayList<ServerName>();
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
RegionInfo prevInfo = null;
for (Region r : rs.getRegionServer().getRegions(table.getName())) {
if (!regions.contains(r.getRegionInfo())
&& !RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())) {
LOG.info("The region is " + r.getRegionInfo() + " the location is "
+ rs.getRegionServer().getServerName());
if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())
&& newRegionLocations.contains(rs.getRegionServer().getServerName())
&& prevInfo != null
&& Bytes.equals(prevInfo.getStartKey(), r.getRegionInfo().getStartKey())
&& Bytes.equals(prevInfo.getEndKey(), r.getRegionInfo().getEndKey())) {
fail("Splitted regions should not be assigned to same region server");
} else {
prevInfo = r.getRegionInfo();
if (!RegionReplicaUtil.isDefaultReplica(r.getRegionInfo())
&& !newRegionLocations.contains(rs.getRegionServer().getServerName())) {
newRegionLocations.add(rs.getRegionServer().getServerName());
}
}
}
}
}
// since we assign the daughter regions in round robin fashion, both the daugther region
// replicas will be assigned to two unique servers.
assertEquals("The new regions should be assigned to 3 unique servers ", 3,
newRegionLocations.size());

RegionReplicaTestHelper.assertReplicaDistributed(HTU, table);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
Expand All @@ -32,14 +29,12 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignmentTestingUtil;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
Expand Down Expand Up @@ -131,7 +126,7 @@ protected void testRecoveryAndDoubleExecution(boolean carryingMeta, boolean doub
long procId = getSCPProcId(procExec);
ProcedureTestingUtility.waitProcedure(procExec, procId);
}
assertReplicaDistributed(t);
RegionReplicaTestHelper.assertReplicaDistributed(util, t);
assertEquals(count, util.countRows(t));
assertEquals(checksum, util.checksumRows(t));
}
Expand All @@ -142,36 +137,6 @@ protected long getSCPProcId(ProcedureExecutor<?> procExec) {
return procExec.getActiveProcIds().stream().mapToLong(Long::longValue).min().getAsLong();
}

private void assertReplicaDistributed(Table t) throws IOException {
if (t.getDescriptor().getRegionReplication() <= 1) {
return;
}
// Assert all data came back.
List<RegionInfo> regionInfos = new ArrayList<>();
for (RegionServerThread rs : this.util.getMiniHBaseCluster().getRegionServerThreads()) {
regionInfos.clear();
for (Region r : rs.getRegionServer().getRegions(t.getName())) {
LOG.info("The region is " + r.getRegionInfo() + " the location is " +
rs.getRegionServer().getServerName());
if (contains(regionInfos, r.getRegionInfo())) {
LOG.error("Am exiting");
fail("Replica regions should be assigned to different region servers");
} else {
regionInfos.add(r.getRegionInfo());
}
}
}
}

private boolean contains(List<RegionInfo> regionInfos, RegionInfo regionInfo) {
for (RegionInfo info : regionInfos) {
if (RegionReplicaUtil.isReplicasForSameRegion(info, regionInfo)) {
return true;
}
}
return false;
}

protected Table createTable(final TableName tableName) throws IOException {
final Table t = this.util.createTable(tableName, HBaseTestingUtility.COLUMNS,
HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE, getRegionReplication());
Expand Down

0 comments on commit b01fb05

Please sign in to comment.