Skip to content

Commit

Permalink
HBASE-27551 Add config options to delay assignment to retain last reg…
Browse files Browse the repository at this point in the history
…ion location (#4945)

Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
wchevreuil authored Jan 24, 2023
1 parent 83d450d commit ea4cb64
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private void checkIsDead(final ServerName serverName, final String what)
* Assumes onlineServers is locked.
* @return ServerName with matching hostname and port.
*/
private ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
ServerName end =
ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ public class AssignmentManager {
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";

public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";

public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;

/** The wait time in millis before checking again if the region's previous RS is back online */
public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
"hbase.master.scp.retain.assignment.force.wait-interval";

public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;

/**
* The number of times to check if the region's previous RS is back online, before giving up and
* proceeding with assignment on a new RS
*/
public static final String FORCE_REGION_RETAINMENT_RETRIES =
"hbase.master.scp.retain.assignment.force.retries";

public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;

private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");

Expand Down Expand Up @@ -201,6 +220,12 @@ public class AssignmentManager {

private Thread assignThread;

private final boolean forceRegionRetainment;

private final long forceRegionRetainmentWaitInterval;

private final int forceRegionRetainmentRetries;

public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
this(master, masterRegion, new RegionStateStore(master, masterRegion));
}
Expand Down Expand Up @@ -240,6 +265,13 @@ public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
}
minVersionToMoveSysTables =
conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);

forceRegionRetainment =
conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
forceRegionRetainmentRetries =
conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
}

private void mirrorMetaLocations() throws IOException, KeeperException {
Expand Down Expand Up @@ -410,6 +442,18 @@ int getAssignMaxAttempts() {
return assignMaxAttempts;
}

public boolean isForceRegionRetainment() {
return forceRegionRetainment;
}

public long getForceRegionRetainmentWaitInterval() {
return forceRegionRetainmentWaitInterval;
}

public int getForceRegionRetainmentRetries() {
return forceRegionRetainmentRetries;
}

int getAssignRetryImmediatelyMaxAttempts() {
return assignRetryImmediatelyMaxAttempts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME;
import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
Expand Down Expand Up @@ -95,6 +98,10 @@
* Notice that, although we allow specify a target server, it just acts as a candidate, we do not
* guarantee that the region will finally be on the target server. If this is important for you, you
* should check whether the region is on the target server after the procedure is finished.
* </p>
* Altenatively, for trying retaining assignments, the
* <b>hbase.master.scp.retain.assignment.force</b> option can be used together with
* <b>hbase.master.scp.retain.assignment</b>.
* <p/>
* When you want to schedule a TRSP, please check whether there is still one for this region, and
* the check should be under the RegionStateNode lock. We will remove the TRSP from a
Expand Down Expand Up @@ -126,6 +133,10 @@ public class TransitRegionStateProcedure

private boolean isSplit;

private RetryCounter forceRetainmentRetryCounter;

private long forceRetainmentTotalWait;

public TransitRegionStateProcedure() {
}

Expand Down Expand Up @@ -163,6 +174,16 @@ protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
}
evictCache =
env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
initForceRetainmentRetryCounter(env);
}

private void initForceRetainmentRetryCounter(MasterProcedureEnv env) {
if (env.getAssignmentManager().isForceRegionRetainment()) {
forceRetainmentRetryCounter =
new RetryCounter(env.getAssignmentManager().getForceRegionRetainmentRetries(),
env.getAssignmentManager().getForceRegionRetainmentWaitInterval(), TimeUnit.MILLISECONDS);
forceRetainmentTotalWait = 0;
}
}

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
Expand All @@ -188,6 +209,31 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, getRegion());
}

private void checkAndWaitForOriginalServer(MasterProcedureEnv env, ServerName lastHost)
throws ProcedureSuspendedException {
ServerManager serverManager = env.getMasterServices().getServerManager();
ServerName newNameForServer = serverManager.findServerWithSameHostnamePortWithLock(lastHost);
boolean isOnline = serverManager.createDestinationServersList().contains(newNameForServer);

if (!isOnline && forceRetainmentRetryCounter.shouldRetry()) {
int backoff =
Math.toIntExact(forceRetainmentRetryCounter.getBackoffTimeAndIncrementAttempts());
forceRetainmentTotalWait += backoff;
LOG.info(
"Suspending the TRSP PID={} for {}ms because {} is true and previous host {} "
+ "for region is not yet online.",
this.getProcId(), backoff, FORCE_REGION_RETAINMENT, lastHost);
setTimeout(backoff);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
}
LOG.info(
"{} is true. TRSP PID={} waited {}ms for host {} to come back online. "
+ "Did host come back online? {}",
FORCE_REGION_RETAINMENT, this.getProcId(), forceRetainmentTotalWait, lastHost, isOnline);
initForceRetainmentRetryCounter(env);
}

private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
throws ProcedureSuspendedException {
boolean retain = false;
Expand All @@ -200,9 +246,18 @@ private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
regionNode.setRegionLocation(assignCandidate);
} else if (regionNode.getLastHost() != null) {
retain = true;
LOG.info("Setting lastHost as the region location {}", regionNode.getLastHost());
LOG.info("Setting lastHost {} as the location for region {}", regionNode.getLastHost(),
regionNode.getRegionInfo().getEncodedName());
regionNode.setRegionLocation(regionNode.getLastHost());
}
if (
regionNode.getRegionLocation() != null
&& env.getAssignmentManager().isForceRegionRetainment()
) {
LOG.warn("{} is set to true. This may delay regions re-assignment "
+ "upon RegionServers crashes or restarts.", FORCE_REGION_RETAINMENT);
checkAndWaitForOriginalServer(env, regionNode.getRegionLocation());
}
}
LOG.info("Starting {}; {}; forceNewPlan={}, retain={}", this, regionNode.toShortString(),
forceNewPlan, retain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master;

import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;
import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL;
import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -33,7 +36,6 @@
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
Expand Down Expand Up @@ -190,6 +192,7 @@ public void testRetainAssignmentOnSingleRSRestart() throws Exception {
cluster.stopMaster(0);
cluster.waitForMasterToStop(master.getServerName(), 5000);
cluster.stopRegionServer(deadRS);
cluster.waitForRegionServerToStop(deadRS, 5000);

LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);
Expand Down Expand Up @@ -228,13 +231,85 @@ public void testRetainAssignmentOnSingleRSRestart() throws Exception {
}
}

/**
* This tests the force retaining assignments upon an RS restart, even when master triggers an SCP
*/
@Test
public void testForceRetainAssignment() throws Exception {
UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true);
UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50);
setupCluster();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
assertEquals(NUM_OF_RS, threads.size());
int[] rsPorts = new int[NUM_OF_RS];
for (int i = 0; i < NUM_OF_RS; i++) {
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
}

// We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
// use it to load all user region placements
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
for (ServerName serverName : regionToRegionServerMap.values()) {
boolean found = false; // Test only, no need to optimize
for (int k = 0; k < NUM_OF_RS && !found; k++) {
found = serverName.getPort() == rsPorts[k];
}
LOG.info("Server {} has regions? {}", serverName, found);
assertTrue(found);
}

// Server to be restarted
ServerName deadRS = threads.get(0).getRegionServer().getServerName();
LOG.info("\n\nStopping {} server", deadRS);
cluster.stopRegionServer(deadRS);

LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);

LOG.info("\n\nStarting region server {} second time with the same port", deadRS);
cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
cluster.startRegionServer();

ensureServersWithSamePort(master, rsPorts);

// Wait till master is initialized and all regions are assigned
for (TableName TABLE : TABLES) {
UTIL.waitTableAvailable(TABLE);
}
UTIL.waitUntilNoRegionsInTransition(60000);
snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
ServerName currentServer = entry.getValue();
LOG.info(
"Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());

if (deadRS.getPort() == oldServer.getPort()) {
// Restarted RS start code wont be same
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
} else {
assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
}
}
}

private void setupCluster() throws Exception, IOException, InterruptedException {
// Set Zookeeper based connection registry since we will stop master and start a new master
// without populating the underlying config for the connection.
UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
// Enable retain assignment during ServerCrashProcedure
UTIL.getConfiguration().setBoolean(ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT, true);
UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true);
UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
.numRegionServers(NUM_OF_RS).build());

Expand Down

0 comments on commit ea4cb64

Please sign in to comment.