Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27551 Add config options to delay assignment to retain last region location #4945

Merged
merged 5 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private void checkIsDead(final ServerName serverName, final String what)
* Assumes onlineServers is locked.
* @return ServerName with matching hostname and port.
*/
private ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
public ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
ServerName end =
ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ public class AssignmentManager {
private static final int DEFAULT_RIT_STUCK_WARNING_THRESHOLD = 60 * 1000;
public static final String UNEXPECTED_STATE_REGION = "Unexpected state for ";

public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";

public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;

/** The wait time in millis before checking again if the region's previous RS is back online */
public static final String FORCE_REGION_RETAINMENT_WAIT_INTERVAL =
"hbase.master.scp.retain.assignment.force.wait-interval";

public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL = 50;

/**
* The number of times to check if the region's previous RS is back online, before giving up and
* proceeding with assignment on a new RS
*/
public static final String FORCE_REGION_RETAINMENT_RETRIES =
"hbase.master.scp.retain.assignment.force.retries";

public static final int DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;

private final ProcedureEvent<?> metaAssignEvent = new ProcedureEvent<>("meta assign");
private final ProcedureEvent<?> metaLoadEvent = new ProcedureEvent<>("meta load");

Expand Down Expand Up @@ -201,6 +220,12 @@ public class AssignmentManager {

private Thread assignThread;

private final boolean forceRegionRetainment;

private final long forceRegionRetainmentWaitInterval;

private final int forceRegionRetainmentRetries;

public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
this(master, masterRegion, new RegionStateStore(master, masterRegion));
}
Expand Down Expand Up @@ -240,6 +265,13 @@ public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
}
minVersionToMoveSysTables =
conf.get(MIN_VERSION_MOVE_SYS_TABLES_CONFIG, DEFAULT_MIN_VERSION_MOVE_SYS_TABLES_CONFIG);

forceRegionRetainment =
conf.getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);
forceRegionRetainmentWaitInterval = conf.getLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL,
DEFAULT_FORCE_REGION_RETAINMENT_WAIT_INTERVAL);
forceRegionRetainmentRetries =
conf.getInt(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);
}

private void mirrorMetaLocations() throws IOException, KeeperException {
Expand Down Expand Up @@ -410,6 +442,18 @@ int getAssignMaxAttempts() {
return assignMaxAttempts;
}

public boolean isForceRegionRetainment() {
return forceRegionRetainment;
}

public long getForceRegionRetainmentWaitInterval() {
return forceRegionRetainmentWaitInterval;
}

public int getForceRegionRetainmentRetries() {
return forceRegionRetainmentRetries;
}

int getAssignRetryImmediatelyMaxAttempts() {
return assignRetryImmediatelyMaxAttempts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME;
import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -31,6 +33,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
Expand Down Expand Up @@ -95,6 +98,10 @@
* Notice that, although we allow specify a target server, it just acts as a candidate, we do not
* guarantee that the region will finally be on the target server. If this is important for you, you
* should check whether the region is on the target server after the procedure is finished.
* </p>
* Altenatively, for trying retaining assignments, the
* <b>hbase.master.scp.retain.assignment.force</b> option can be used together with
* <b>hbase.master.scp.retain.assignment</b>.
* <p/>
* When you want to schedule a TRSP, please check whether there is still one for this region, and
* the check should be under the RegionStateNode lock. We will remove the TRSP from a
Expand Down Expand Up @@ -126,6 +133,10 @@ public class TransitRegionStateProcedure

private boolean isSplit;

private RetryCounter forceRetainmentRetryCounter;

private long forceRetainmentTotalWait;

public TransitRegionStateProcedure() {
}

Expand Down Expand Up @@ -163,6 +174,16 @@ protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
}
evictCache =
env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);
initForceRetainmentRetryCounter(env);
}

private void initForceRetainmentRetryCounter(MasterProcedureEnv env) {
if (env.getAssignmentManager().isForceRegionRetainment()) {
forceRetainmentRetryCounter =
new RetryCounter(env.getAssignmentManager().getForceRegionRetainmentRetries(),
env.getAssignmentManager().getForceRegionRetainmentWaitInterval(), TimeUnit.MILLISECONDS);
forceRetainmentTotalWait = 0;
}
}

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
Expand All @@ -188,6 +209,31 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, getRegion());
}

private void checkAndWaitForOriginalServer(MasterProcedureEnv env, ServerName lastHost)
throws ProcedureSuspendedException {
ServerManager serverManager = env.getMasterServices().getServerManager();
ServerName newNameForServer = serverManager.findServerWithSameHostnamePortWithLock(lastHost);
boolean isOnline = serverManager.createDestinationServersList().contains(newNameForServer);

if (!isOnline && forceRetainmentRetryCounter.shouldRetry()) {
int backoff =
Math.toIntExact(forceRetainmentRetryCounter.getBackoffTimeAndIncrementAttempts());
forceRetainmentTotalWait += backoff;
LOG.info(
"Suspending the TRSP PID={} for {}ms because {} is true and previous host {} "
+ "for region is not yet online.",
this.getProcId(), backoff, FORCE_REGION_RETAINMENT, lastHost);
setTimeout(backoff);
setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
throw new ProcedureSuspendedException();
}
LOG.info(
Apache9 marked this conversation as resolved.
Show resolved Hide resolved
"{} is true. TRSP PID={} waited {}ms for host {} to come back online. "
+ "Did host come back online? {}",
FORCE_REGION_RETAINMENT, this.getProcId(), forceRetainmentTotalWait, lastHost, isOnline);
initForceRetainmentRetryCounter(env);
}

private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
throws ProcedureSuspendedException {
boolean retain = false;
Expand All @@ -200,9 +246,18 @@ private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
regionNode.setRegionLocation(assignCandidate);
} else if (regionNode.getLastHost() != null) {
retain = true;
LOG.info("Setting lastHost as the region location {}", regionNode.getLastHost());
LOG.info("Setting lastHost {} as the location for region {}", regionNode.getLastHost(),
regionNode.getRegionInfo().getEncodedName());
regionNode.setRegionLocation(regionNode.getLastHost());
}
if (
regionNode.getRegionLocation() != null
&& env.getAssignmentManager().isForceRegionRetainment()
) {
LOG.warn("{} is set to true. This may delay regions re-assignment "
+ "upon RegionServers crashes or restarts.", FORCE_REGION_RETAINMENT);
checkAndWaitForOriginalServer(env, regionNode.getRegionLocation());
}
}
LOG.info("Starting {}; {}; forceNewPlan={}, retain={}", this, regionNode.toShortString(),
forceNewPlan, retain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.master;

import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;
import static org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT_WAIT_INTERVAL;
import static org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -33,7 +36,6 @@
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
Expand Down Expand Up @@ -190,6 +192,7 @@ public void testRetainAssignmentOnSingleRSRestart() throws Exception {
cluster.stopMaster(0);
cluster.waitForMasterToStop(master.getServerName(), 5000);
cluster.stopRegionServer(deadRS);
cluster.waitForRegionServerToStop(deadRS, 5000);

LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);
Expand Down Expand Up @@ -228,13 +231,85 @@ public void testRetainAssignmentOnSingleRSRestart() throws Exception {
}
}

/**
* This tests the force retaining assignments upon an RS restart, even when master triggers an SCP
*/
@Test
public void testForceRetainAssignment() throws Exception {
UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true);
UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT_INTERVAL, 50);
setupCluster();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
assertEquals(NUM_OF_RS, threads.size());
int[] rsPorts = new int[NUM_OF_RS];
for (int i = 0; i < NUM_OF_RS; i++) {
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
}

// We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
// use it to load all user region placements
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
for (ServerName serverName : regionToRegionServerMap.values()) {
boolean found = false; // Test only, no need to optimize
for (int k = 0; k < NUM_OF_RS && !found; k++) {
found = serverName.getPort() == rsPorts[k];
}
LOG.info("Server {} has regions? {}", serverName, found);
assertTrue(found);
}

// Server to be restarted
ServerName deadRS = threads.get(0).getRegionServer().getServerName();
LOG.info("\n\nStopping {} server", deadRS);
cluster.stopRegionServer(deadRS);

LOG.info("\n\nSleeping a bit");
Thread.sleep(2000);

LOG.info("\n\nStarting region server {} second time with the same port", deadRS);
cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
cluster.startRegionServer();

ensureServersWithSamePort(master, rsPorts);

// Wait till master is initialized and all regions are assigned
for (TableName TABLE : TABLES) {
UTIL.waitTableAvailable(TABLE);
}
UTIL.waitUntilNoRegionsInTransition(60000);
snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
ServerName currentServer = entry.getValue();
LOG.info(
"Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());

if (deadRS.getPort() == oldServer.getPort()) {
// Restarted RS start code wont be same
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
} else {
assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
}
}
}

private void setupCluster() throws Exception, IOException, InterruptedException {
// Set Zookeeper based connection registry since we will stop master and start a new master
// without populating the underlying config for the connection.
UTIL.getConfiguration().set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
// Enable retain assignment during ServerCrashProcedure
UTIL.getConfiguration().setBoolean(ServerCrashProcedure.MASTER_SCP_RETAIN_ASSIGNMENT, true);
UTIL.getConfiguration().setBoolean(MASTER_SCP_RETAIN_ASSIGNMENT, true);
UTIL.startMiniCluster(StartTestingClusterOption.builder().masterClass(HMasterForTest.class)
.numRegionServers(NUM_OF_RS).build());

Expand Down