Skip to content

Commit

Permalink
HBASE-27551 Add config options to delay assignment to retain last reg…
Browse files Browse the repository at this point in the history
…ion location

Change-Id: I365c8232adf5149fc2b220089d2525597d868e02
  • Loading branch information
Wellington Chevreuil committed Jan 5, 2023
1 parent 3f1087f commit dd11028
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,10 @@ private void checkIsDead(final ServerName serverName, final String what)
* Assumes onlineServers is locked.
* @return ServerName with matching hostname and port.
*/
private ServerName findServerWithSameHostnamePortWithLock(final ServerName serverName) {
ServerName end =
ServerName.valueOf(serverName.getHostname(), serverName.getPort(), Long.MAX_VALUE);
public ServerName findServerWithSameHostnamePortWithLock(
final ServerName serverName) {
ServerName end = ServerName.valueOf(serverName.getHostname(), serverName.getPort(),
Long.MAX_VALUE);

ServerName r = onlineServers.lowerKey(end);
if (r != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.master.MetricsAssignmentManager;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.procedure.AbstractStateMachineRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
Expand Down Expand Up @@ -107,6 +108,18 @@ public class TransitRegionStateProcedure

private static final Logger LOG = LoggerFactory.getLogger(TransitRegionStateProcedure.class);

public static final String FORCE_REGION_RETAINMENT = "hbase.master.scp.retain.assignment.force";

public static final boolean DEFAULT_FORCE_REGION_RETAINMENT = false;

public static final String FORCE_REGION_RETAINMENT_WAIT = "hbase.master.scp.retain.assignment.force.wait";

public static final long DEFAULT_FORCE_REGION_RETAINMENT_WAIT = 500;

public static final String FORCE_REGION_RETAINMENT_RETRIES = "hbase.master.scp.retain.assignment.force.retries";

public static final long DEFAULT_FORCE_REGION_RETAINMENT_RETRIES = 600;

private TransitionType type;

private RegionStateTransitionState initialState;
Expand All @@ -126,6 +139,14 @@ public class TransitRegionStateProcedure

private boolean isSplit;

private boolean forceRegionRetainment;

private ServerManager serverManager;

private long forceRegionRetainmentWait;

private long forceRegionRetainmentRetries;

public TransitRegionStateProcedure() {
}

Expand Down Expand Up @@ -163,6 +184,17 @@ protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
}
evictCache =
env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE);

forceRegionRetainment = env.getMasterConfiguration().
getBoolean(FORCE_REGION_RETAINMENT, DEFAULT_FORCE_REGION_RETAINMENT);

forceRegionRetainmentWait = env.getMasterConfiguration().
getLong(FORCE_REGION_RETAINMENT_WAIT, DEFAULT_FORCE_REGION_RETAINMENT_WAIT);

forceRegionRetainmentRetries = env.getMasterConfiguration().
getLong(FORCE_REGION_RETAINMENT_RETRIES, DEFAULT_FORCE_REGION_RETAINMENT_RETRIES);

serverManager = env.getMasterServices().getServerManager();
}

protected TransitRegionStateProcedure(MasterProcedureEnv env, RegionInfo hri,
Expand All @@ -188,6 +220,26 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
return am.waitMetaLoaded(this) || am.waitMetaAssigned(this, getRegion());
}

private void checkAndWaitForOriginalServer(ServerName lastHost)
throws ProcedureSuspendedException {
boolean isOnline = serverManager.findServerWithSameHostnamePortWithLock(lastHost) != null;
long retries = 0;
while(!isOnline && retries < forceRegionRetainmentRetries) {
try {
synchronized (this) {
wait(forceRegionRetainmentWait);
}
} catch (InterruptedException e) {
throw new ProcedureSuspendedException();
}
retries++;
isOnline = serverManager.findServerWithSameHostnamePortWithLock(lastHost) != null;
}
LOG.info("{} is true. We waited {} ms for host {} to come back online. "
+ "Did host come back online? {}", FORCE_REGION_RETAINMENT,
(retries * forceRegionRetainmentRetries), lastHost, isOnline);
}

private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
throws ProcedureSuspendedException {
boolean retain = false;
Expand All @@ -200,9 +252,15 @@ private void queueAssign(MasterProcedureEnv env, RegionStateNode regionNode)
regionNode.setRegionLocation(assignCandidate);
} else if (regionNode.getLastHost() != null) {
retain = true;
LOG.info("Setting lastHost as the region location {}", regionNode.getLastHost());
LOG.info("Setting lastHost {} as the location for region {}",
regionNode.getLastHost(), regionNode.getRegionInfo().getEncodedName());
regionNode.setRegionLocation(regionNode.getLastHost());
}
if (regionNode.getRegionLocation() != null && forceRegionRetainment) {
LOG.warn("{} is set to true. This may delay regions re-assignment "
+ "upon RegionServers crashes or restarts.", FORCE_REGION_RETAINMENT);
checkAndWaitForOriginalServer(regionNode.getRegionLocation());
}
}
LOG.info("Starting {}; {}; forceNewPlan={}, retain={}", this, regionNode.toShortString(),
forceNewPlan, retain);
Expand Down Expand Up @@ -373,7 +431,7 @@ protected Flow executeFromState(MasterProcedureEnv env, RegionStateTransitionSta
return Flow.NO_MORE_STATE;
}
}
queueAssign(env, regionNode);
queueAssign(env, regionNode);
return Flow.HAS_MORE_STATE;
case REGION_STATE_TRANSITION_OPEN:
openRegion(env, regionNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@
*/
package org.apache.hadoop.hbase.master;

import static org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure.DEFAULT_FORCE_REGION_RETAINMENT_WAIT;
import static org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure.FORCE_REGION_RETAINMENT;
import static org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure.FORCE_REGION_RETAINMENT_WAIT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -228,6 +233,84 @@ public void testRetainAssignmentOnSingleRSRestart() throws Exception {
}
}

/**
* This tests the force retaining assignments upon an RS restart, even when master triggers an SCP
*/
@Test
public void testForceRetainAssignment() throws Exception {
UTIL.getConfiguration().setBoolean(FORCE_REGION_RETAINMENT, true);
UTIL.getConfiguration().setLong(FORCE_REGION_RETAINMENT_WAIT, 2000);
setupCluster();
HMaster master = UTIL.getMiniHBaseCluster().getMaster();
SingleProcessHBaseCluster cluster = UTIL.getHBaseCluster();
List<JVMClusterUtil.RegionServerThread> threads = cluster.getLiveRegionServerThreads();
assertEquals(NUM_OF_RS, threads.size());
int[] rsPorts = new int[NUM_OF_RS];
for (int i = 0; i < NUM_OF_RS; i++) {
rsPorts[i] = threads.get(i).getRegionServer().getServerName().getPort();
}

// We don't have to use SnapshotOfRegionAssignmentFromMeta. We use it here because AM used to
// use it to load all user region placements
SnapshotOfRegionAssignmentFromMeta snapshot =
new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> regionToRegionServerMap = snapshot.getRegionToRegionServerMap();
for (ServerName serverName : regionToRegionServerMap.values()) {
boolean found = false; // Test only, no need to optimize
for (int k = 0; k < NUM_OF_RS && !found; k++) {
found = serverName.getPort() == rsPorts[k];
}
assertTrue(found);
}

// Server to be restarted
ServerName deadRS = threads.get(0).getRegionServer().getServerName();
List<RegionInfo> deadRSRegions = snapshot.getRegionServerToRegionMap().get(deadRS);
LOG.info("\n\nStopping {} server", deadRS);
cluster.stopRegionServer(deadRS);

LOG.info("\n\nSleeping a bit");
Thread.sleep(1000);
//make sure none of the regions previously on this dead server got reassigned
List<RegionInfo> rits = new ArrayList<>();
cluster.getMaster().getAssignmentManager().getRegionsInTransition().stream().
forEach( r -> rits.add(r.getRegionInfo()));
deadRSRegions.forEach( r -> assertTrue(rits.contains(r)));

LOG.info("\n\nStarting region server {} second time with the same port", deadRS);
cluster.getConf().setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 3);
cluster.getConf().setInt(HConstants.REGIONSERVER_PORT, deadRS.getPort());
cluster.startRegionServer();

ensureServersWithSamePort(master, rsPorts);

// Wait till master is initialized and all regions are assigned
for (TableName TABLE : TABLES) {
UTIL.waitTableAvailable(TABLE);
}
UTIL.waitUntilNoRegionsInTransition(60000);

snapshot = new SnapshotOfRegionAssignmentFromMeta(master.getConnection());
snapshot.initialize();
Map<RegionInfo, ServerName> newRegionToRegionServerMap = snapshot.getRegionToRegionServerMap();
assertEquals(regionToRegionServerMap.size(), newRegionToRegionServerMap.size());
for (Map.Entry<RegionInfo, ServerName> entry : newRegionToRegionServerMap.entrySet()) {
ServerName oldServer = regionToRegionServerMap.get(entry.getKey());
ServerName currentServer = entry.getValue();
LOG.info(
"Key=" + entry.getKey() + " oldServer=" + oldServer + ", currentServer=" + currentServer);
assertEquals(entry.getKey().toString(), oldServer.getAddress(), currentServer.getAddress());

if (deadRS.getPort() == oldServer.getPort()) {
// Restarted RS start code wont be same
assertNotEquals(oldServer.getStartcode(), currentServer.getStartcode());
} else {
assertEquals(oldServer.getStartcode(), currentServer.getStartcode());
}
}
}

private void setupCluster() throws Exception, IOException, InterruptedException {
// Set Zookeeper based connection registry since we will stop master and start a new master
// without populating the underlying config for the connection.
Expand Down

0 comments on commit dd11028

Please sign in to comment.