diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
index 8cc7ab72148e..5fbc3ac09b94 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupAdminServer.java
@@ -27,7 +27,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
+
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -54,15 +57,27 @@
@InterfaceAudience.Private
public class RSGroupAdminServer implements RSGroupAdmin {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminServer.class);
- public static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = "should keep at least " +
+ static final String KEEP_ONE_SERVER_IN_DEFAULT_ERROR_MESSAGE = "should keep at least " +
"one server in 'default' RSGroup.";
private MasterServices master;
private final RSGroupInfoManager rsGroupInfoManager;
+ /** Define the config key of retries threshold when movements failed */
+ //made package private for testing
+ static final String FAILED_MOVE_MAX_RETRY = "hbase.rsgroup.move.max.retry";
+
+ /** Define the default number of retries */
+ //made package private for testing
+ static final int DEFAULT_MAX_RETRY_VALUE = 50;
+
+ private int moveMaxRetry;
+
public RSGroupAdminServer(MasterServices master, RSGroupInfoManager rsGroupInfoManager) {
this.master = master;
this.rsGroupInfoManager = rsGroupInfoManager;
+ this.moveMaxRetry = master.getConfiguration().getInt(FAILED_MOVE_MAX_RETRY,
+ DEFAULT_MAX_RETRY_VALUE);
}
@Override
@@ -202,20 +217,77 @@ private void checkServersAndTables(Set
servers, Set tables,
* @throws IOException if moving the server and tables fail
*/
private void moveServerRegionsFromGroup(Set servers, String targetGroupName)
- throws IOException {
+ throws IOException {
+ moveRegionsBetweenGroups(servers, targetGroupName,
+ rs -> getRegions(rs),
+ info -> {
+ try {
+ RSGroupInfo group = getRSGroupInfo(targetGroupName);
+ return group.containsTable(info.getTable());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ },
+ rs -> rs.getHostname());
+ }
+
+ /**
+ * Moves regions of tables which are not on target group servers.
+ *
+ * @param tables the tables that will move to new group
+ * @param targetGroupName the target group name
+ * @throws IOException if moving the region fails
+ */
+ private void moveTableRegionsToGroup(Set tables, String targetGroupName)
+ throws IOException {
+ moveRegionsBetweenGroups(tables, targetGroupName,
+ table -> {
+ if (master.getAssignmentManager().isTableDisabled(table)) {
+ return new ArrayList<>();
+ }
+ return master.getAssignmentManager().getRegionStates().getRegionsOfTable(table);
+ },
+ info -> {
+ try {
+ RSGroupInfo group = getRSGroupInfo(targetGroupName);
+ ServerName sn =
+ master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(info);
+ return group.containsServer(sn.getAddress());
+ } catch (IOException e) {
+ e.printStackTrace();
+ return false;
+ }
+ },
+ table -> table.getNameWithNamespaceInclAsString());
+ }
+
+ private void moveRegionsBetweenGroups(Set regionsOwners, String targetGroupName,
+ Function> getRegionsInfo, Function validation,
+ Function getOwnerName) throws IOException {
boolean hasRegionsToMove;
- RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
- Set allSevers = new HashSet<>(servers);
+ int retry = 0;
+ Set allOwners = new HashSet<>(regionsOwners);
+ Set failedRegions = new HashSet<>();
+ IOException toThrow = null;
do {
hasRegionsToMove = false;
- for (Iterator iter = allSevers.iterator(); iter.hasNext();) {
- Address rs = iter.next();
+ for (Iterator iter = allOwners.iterator(); iter.hasNext(); ) {
+ T owner = iter.next();
// Get regions that are associated with this server and filter regions by group tables.
- for (RegionInfo region : getRegions(rs)) {
- if (!targetGrp.containsTable(region.getTable())) {
- LOG.info("Moving server region {}, which do not belong to RSGroup {}",
+ for (RegionInfo region : getRegionsInfo.apply(owner)) {
+ if (!validation.apply(region)) {
+ LOG.info("Moving region {}, which do not belong to RSGroup {}",
region.getShortNameToLog(), targetGroupName);
- this.master.getAssignmentManager().move(region);
+ try {
+ this.master.getAssignmentManager().move(region);
+ failedRegions.remove(region.getRegionNameAsString());
+ } catch (IOException ioe) {
+ LOG.debug("Move region {} from group failed, will retry, current retry time is {}",
+ region.getShortNameToLog(), retry, ioe);
+ toThrow = ioe;
+ failedRegions.add(region.getRegionNameAsString());
+ }
if (master.getAssignmentManager().getRegionStates().
getRegionState(region).isFailedOpen()) {
continue;
@@ -225,44 +297,30 @@ private void moveServerRegionsFromGroup(Set servers, String targetGroup
}
if (!hasRegionsToMove) {
- LOG.info("Server {} has no more regions to move for RSGroup", rs.getHostname());
+ LOG.info("No more regions to move from {} to RSGroup", getOwnerName.apply(owner));
iter.remove();
}
}
+
+ retry++;
try {
rsGroupInfoManager.wait(1000);
} catch (InterruptedException e) {
LOG.warn("Sleep interrupted", e);
Thread.currentThread().interrupt();
}
- } while (hasRegionsToMove);
- }
-
- /**
- * Moves regions of tables which are not on target group servers.
- *
- * @param tables the tables that will move to new group
- * @param targetGroupName the target group name
- * @throws IOException if moving the region fails
- */
- private void moveTableRegionsToGroup(Set tables, String targetGroupName)
- throws IOException {
- RSGroupInfo targetGrp = getRSGroupInfo(targetGroupName);
- for (TableName table : tables) {
- if (master.getAssignmentManager().isTableDisabled(table)) {
- LOG.debug("Skipping move regions because the table {} is disabled", table);
- continue;
- }
- LOG.info("Moving region(s) for table {} to RSGroup {}", table, targetGroupName);
- for (RegionInfo region : master.getAssignmentManager().getRegionStates()
- .getRegionsOfTable(table)) {
- ServerName sn =
- master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(region);
- if (!targetGrp.containsServer(sn.getAddress())) {
- LOG.info("Moving region {} to RSGroup {}", region.getShortNameToLog(), targetGroupName);
- master.getAssignmentManager().move(region);
- }
- }
+ } while (hasRegionsToMove && retry <= moveMaxRetry);
+
+ //has up to max retry time or there are no more regions to move
+ if (hasRegionsToMove) {
+ // print failed moved regions, for later process conveniently
+ String msg = String
+ .format("move regions for group %s failed, failed regions: %s", targetGroupName,
+ failedRegions);
+ LOG.error(msg);
+ throw new DoNotRetryIOException(
+ msg + ", just record the last failed region's cause, more details in server log",
+ toThrow);
}
}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java
index d18bb669df41..d9c1b10cb6a2 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsAdmin2.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.rsgroup;
+import static org.apache.hadoop.hbase.rsgroup.RSGroupAdminServer.DEFAULT_MAX_RETRY_VALUE;
+import static org.apache.hadoop.hbase.util.Threads.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -29,6 +31,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ServerName;
@@ -36,9 +40,12 @@
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.constraint.ConstraintException;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -52,7 +59,7 @@
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
-@Category({ MediumTests.class })
+@Category({ LargeTests.class })
public class TestRSGroupsAdmin2 extends TestRSGroupsBase {
@ClassRule
@@ -459,4 +466,210 @@ public boolean evaluate() throws Exception {
Assert.assertEquals(null, rsGroupAdmin.getRSGroupInfo(fooGroup.getName()));
}
+ @Test
+ public void testFailedMoveBeforeRetryExhaustedWhenMoveServer() throws Exception {
+ String groupName = getGroupName(name.getMethodName());
+ rsGroupAdmin.addRSGroup(groupName);
+ final RSGroupInfo newGroup = rsGroupAdmin.getRSGroupInfo(groupName);
+ Pair gotPair = createTableWithRegionSplitting(newGroup,
+ 10);
+
+ // start thread to recover region state
+ final ServerName movedServer = gotPair.getFirst();
+ final RegionStateNode rsn = gotPair.getSecond();
+ AtomicBoolean changed = new AtomicBoolean(false);
+ Thread t1 = recoverRegionStateThread(movedServer,
+ server -> master.getAssignmentManager().getRegionsOnServer(movedServer), rsn, changed);
+ t1.start();
+
+ // move target server to group
+ Thread t2 = new Thread(() -> {
+ LOG.info("thread2 start running, to move regions");
+ try {
+ rsGroupAdmin.moveServers(Sets.newHashSet(movedServer.getAddress()), newGroup.getName());
+ } catch (IOException e) {
+ LOG.error("move server error", e);
+ }
+ });
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() {
+ if (changed.get()) {
+ return master.getAssignmentManager().getRegionsOnServer(movedServer).size() == 0 && !rsn
+ .getRegionLocation().equals(movedServer);
+ }
+ return false;
+ }
+ });
+ }
+
+ @Test
+ public void testFailedMoveBeforeRetryExhaustedWhenMoveTable() throws Exception {
+ final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
+ Pair gotPair = createTableWithRegionSplitting(newGroup,
+ 5);
+
+ // move table to group
+ Thread t2 = new Thread(() -> {
+ LOG.info("thread2 start running, to move regions");
+ try {
+ rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
+ } catch (IOException e) {
+ LOG.error("move server error", e);
+ }
+ });
+ t2.start();
+
+ // start thread to recover region state
+ final ServerName ss = gotPair.getFirst();
+ final RegionStateNode rsn = gotPair.getSecond();
+ AtomicBoolean changed = new AtomicBoolean(false);
+
+ Thread t1 = recoverRegionStateThread(ss, server -> {
+ List regions = master.getAssignmentManager().getRegionsOnServer(ss);
+ List tableRegions = new ArrayList<>();
+ for (RegionInfo regionInfo : regions) {
+ if (regionInfo.getTable().equals(tableName)) {
+ tableRegions.add(regionInfo);
+ }
+ }
+ return tableRegions;
+ }, rsn, changed);
+ t1.start();
+
+ t1.join();
+ t2.join();
+
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() {
+ if (changed.get()) {
+ boolean serverHasTableRegions = false;
+ for (RegionInfo regionInfo : master.getAssignmentManager().getRegionsOnServer(ss)) {
+ if (regionInfo.getTable().equals(tableName)) {
+ serverHasTableRegions = true;
+ break;
+ }
+ }
+ return !serverHasTableRegions && !rsn.getRegionLocation().equals(ss);
+ }
+ return false;
+ }
+ });
+ }
+
+ private Thread recoverRegionStateThread(T owner, Function> getRegions,
+ RegionStateNode rsn, AtomicBoolean changed){
+ return new Thread(() -> {
+ LOG.info("thread1 start running, will recover region state");
+ long current = System.currentTimeMillis();
+ // wait until there is only left the region we changed state and recover its state.
+ // wait time is set according to the number of max retries, all except failed regions will be
+ // moved in one retry, and will sleep 1s until next retry.
+ while (System.currentTimeMillis() - current <= DEFAULT_MAX_RETRY_VALUE * 1000) {
+ List regions = getRegions.apply(owner);
+ LOG.debug("server table region size is:{}", regions.size());
+ assert regions.size() >= 1;
+ // when there is exactly one region left, we can determine the move operation encountered
+ // exception caused by the strange region state.
+ if (regions.size() == 1) {
+ assertEquals(regions.get(0).getRegionNameAsString(),
+ rsn.getRegionInfo().getRegionNameAsString());
+ rsn.setState(RegionState.State.OPEN);
+ LOG.info("set region {} state OPEN", rsn.getRegionInfo().getRegionNameAsString());
+ changed.set(true);
+ break;
+ }
+ sleep(5000);
+ }
+ });
+ }
+
+ @Test
+ public void testFailedMoveWhenMoveServer() throws Exception{
+ String groupName = getGroupName(name.getMethodName());
+ rsGroupAdmin.addRSGroup(groupName);
+ final RSGroupInfo newGroup = rsGroupAdmin.getRSGroupInfo(groupName);
+ Pair gotPair = createTableWithRegionSplitting(newGroup,
+ 10);
+ try{
+ rsGroupAdmin.moveServers(Sets.newHashSet(gotPair.getFirst().getAddress()),
+ newGroup.getName());
+ fail("should get IOException when retry exhausted but there still exists failed moved "
+ + "regions");
+ }catch (IOException e){
+ assertTrue(e.getMessage().contains(
+ gotPair.getSecond().getRegionInfo().getRegionNameAsString()));
+ }
+ }
+
+ @Test
+ public void testFailedMoveWhenMoveTable() throws Exception{
+ final RSGroupInfo newGroup = addGroup(getGroupName(name.getMethodName()), 1);
+ Pair gotPair = createTableWithRegionSplitting(newGroup,
+ 5);
+ try{
+ rsGroupAdmin.moveTables(Sets.newHashSet(tableName), newGroup.getName());
+ fail("should get IOException when retry exhausted but there still exists failed moved "
+ + "regions");
+ }catch (IOException e){
+ assertTrue(e.getMessage().contains(
+ gotPair.getSecond().getRegionInfo().getRegionNameAsString()));
+ }
+ }
+
+ private Pair createTableWithRegionSplitting(RSGroupInfo rsGroupInfo,
+ int tableRegionCount) throws Exception{
+ final byte[] familyNameBytes = Bytes.toBytes("f");
+ // All the regions created below will be assigned to the default group.
+ TEST_UTIL.createMultiRegionTable(tableName, familyNameBytes, tableRegionCount);
+ TEST_UTIL.waitFor(WAIT_TIMEOUT, new Waiter.Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ List regions = getTableRegionMap().get(tableName);
+ if (regions == null) {
+ return false;
+ }
+ return getTableRegionMap().get(tableName).size() >= tableRegionCount;
+ }
+ });
+
+ return randomlySetOneRegionStateToSplitting(rsGroupInfo);
+ }
+
+ /**
+ * Randomly choose a region to set state.
+ * @param newGroup target group
+ * @return source server of region, and region state
+ * @throws IOException if methods called throw
+ */
+ private Pair randomlySetOneRegionStateToSplitting(
+ RSGroupInfo newGroup) throws IOException{
+ // get target server to move, which should has more than one regions
+ // randomly set a region state to SPLITTING to make move fail
+ Map> assignMap = getTableServerRegionMap().get(tableName);
+ String rregion = null;
+ ServerName toMoveServer = null;
+ for (ServerName server : assignMap.keySet()) {
+ rregion = assignMap.get(server).size() > 1 &&
+ !newGroup.containsServer(server.getAddress()) ? assignMap.get(server).get(0) : null;
+ if (rregion != null) {
+ toMoveServer = server;
+ break;
+ }
+ }
+ assert toMoveServer != null;
+ RegionInfo ri = TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().
+ getRegionInfo(Bytes.toBytesBinary(rregion));
+ RegionStateNode rsn =
+ TEST_UTIL.getMiniHBaseCluster().getMaster().getAssignmentManager().getRegionStates()
+ .getRegionStateNode(ri);
+ rsn.setState(RegionState.State.SPLITTING);
+ return new Pair<>(toMoveServer, rsn);
+ }
}
diff --git a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
index 3d8dd167330e..c5520cf11f1c 100644
--- a/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
+++ b/hbase-rsgroup/src/test/java/org/apache/hadoop/hbase/rsgroup/TestRSGroupsBase.java
@@ -100,11 +100,13 @@ public static void setUpTestBeforeClass() throws Exception {
RSGroupBasedLoadBalancer.class.getName());
TEST_UTIL.getConfiguration().set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
RSGroupAdminEndpoint.class.getName() + "," + CPMasterObserver.class.getName());
- TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setInt(
ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART,
NUM_SLAVES_BASE - 1);
TEST_UTIL.getConfiguration().setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true);
+ TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 100000);
+
+ TEST_UTIL.startMiniCluster(NUM_SLAVES_BASE - 1);
initialize();
}