Skip to content

Commit

Permalink
HBASE-27430 Should disable replication log cleaner when migrating rep…
Browse files Browse the repository at this point in the history
…lication queue data (#4901)

Signed-off-by: Liangjun He <[email protected]>
  • Loading branch information
Apache9 committed Jan 18, 2023
1 parent ac878a5 commit ff31d25
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -724,11 +724,13 @@ message AssignReplicationQueuesStateData {
}

enum MigrateReplicationQueueFromZkToTableState {
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 5;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER = 1;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE = 2;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER = 3;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE = 4;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING = 5;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER = 6;
MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER = 7;
}

message MigrateReplicationQueueFromZkToTableStateData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hbase.master.replication;

import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_PEER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_MIGRATE;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
Expand Down Expand Up @@ -111,6 +113,26 @@ private void shutdownExecutorService() {
}
}

private void disableReplicationLogCleaner(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
// it is not likely that we can reach here as we will schedule this procedure immediately
// after master restarting, where ReplicationLogCleaner should have not started its first run
// yet. But anyway, let's make the code more robust. And it is safe to wait a bit here since
// there will be no data in the new replication queue storage before we execute this procedure
// so ReplicationLogCleaner will quit immediately without doing anything.
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.info(
"Can not disable replication log cleaner, sleep {} secs and retry later",
backoff / 1000));
}
resetRetry();
}

private void enableReplicationLogCleaner(MasterProcedureEnv env) {
env.getReplicationPeerManager().getReplicationLogCleanerBarrier().enable();
}

private void waitUntilNoPeerProcedure(MasterProcedureEnv env) throws ProcedureSuspendedException {
long peerProcCount;
try {
Expand All @@ -136,6 +158,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
MigrateReplicationQueueFromZkToTableState state)
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
switch (state) {
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER:
disableReplicationLogCleaner(env);
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE:
waitUntilNoPeerProcedure(env);
List<ReplicationPeerDescription> peers = env.getReplicationPeerManager().listPeers(null);
Expand All @@ -152,7 +178,8 @@ protected Flow executeFromState(MasterProcedureEnv env,
"failed to delete old replication queue data, sleep {} secs and retry later",
backoff / 1000, e));
}
return Flow.NO_MORE_STATE;
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
return Flow.HAS_MORE_STATE;
}
// here we do not care the peers which have already been disabled, as later we do not need
// to enable them
Expand Down Expand Up @@ -232,6 +259,10 @@ protected Flow executeFromState(MasterProcedureEnv env,
for (String peerId : disabledPeerIds) {
addChildProcedure(new EnablePeerProcedure(peerId));
}
setNextState(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER);
return Flow.HAS_MORE_STATE;
case MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_ENABLE_CLEANER:
enableReplicationLogCleaner(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
Expand Down Expand Up @@ -263,7 +294,19 @@ protected int getStateId(MigrateReplicationQueueFromZkToTableState state) {

@Override
protected MigrateReplicationQueueFromZkToTableState getInitialState() {
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_PREPARE;
return MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
}

@Override
protected void afterReplay(MasterProcedureEnv env) {
if (getCurrentState() == getInitialState()) {
// do not need to disable log cleaner or acquire lock if we are in the initial state, later
// when executing the procedure we will try to disable and acquire.
return;
}
if (!env.getReplicationPeerManager().getReplicationLogCleanerBarrier().disable()) {
throw new IllegalStateException("can not disable log cleaner, this should not happen");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
*/
package org.apache.hadoop.hbase.master.replication;

import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER;
import static org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.MigrateReplicationQueueFromZkToTableState.MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -48,6 +51,7 @@
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleanerBarrier;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -102,6 +106,8 @@ public Map<ServerName, ServerMetrics> getOnlineServers() {

@BeforeClass
public static void setupCluster() throws Exception {
// one hour, to make sure it will not run during the test
UTIL.getConfiguration().setInt(HMaster.HBASE_MASTER_CLEANER_INTERVAL, 60 * 60 * 1000);
UTIL.startMiniCluster(
StartTestingClusterOption.builder().masterClass(HMasterForTest.class).build());
}
Expand Down Expand Up @@ -193,8 +199,10 @@ public void testWaitUntilNoPeerProcedure() throws Exception {
UTIL.waitFor(30000, () -> proc.isSuccess());
}

// make sure we will disable replication peers while migrating
// and also tests disable/enable replication log cleaner and wait for region server upgrading
@Test
public void testDisablePeerAndWaitUpgrading() throws Exception {
public void testDisablePeerAndWaitStates() throws Exception {
String peerId = "2";
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL.getZkCluster().getAddress().toString() + ":/testhbase")
Expand All @@ -206,21 +214,40 @@ public void testDisablePeerAndWaitUpgrading() throws Exception {
EXTRA_REGION_SERVERS
.put(ServerName.valueOf("localhost", 54321, EnvironmentEdgeManager.currentTime()), metrics);

ReplicationLogCleanerBarrier barrier = UTIL.getHBaseCluster().getMaster()
.getReplicationPeerManager().getReplicationLogCleanerBarrier();
assertTrue(barrier.start());

ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();

MigrateReplicationQueueFromZkToTableProcedure proc =
new MigrateReplicationQueueFromZkToTableProcedure();
procExec.submitProcedure(proc);

Thread.sleep(5000);
// make sure we are still waiting for replication log cleaner quit
assertEquals(MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_DISABLE_CLEANER.getNumber(),
proc.getCurrentStateId());
barrier.stop();

// wait until we reach the wait upgrading state
UTIL.waitFor(30000,
() -> proc.getCurrentStateId()
== MIGRATE_REPLICATION_QUEUE_FROM_ZK_TO_TABLE_WAIT_UPGRADING.getNumber()
&& proc.getState() == ProcedureState.WAITING_TIMEOUT);
// make sure the peer is disabled for migrating
assertFalse(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
// make sure the replication log cleaner is disabled
assertFalse(barrier.start());

// the procedure should finish successfully
EXTRA_REGION_SERVERS.clear();
UTIL.waitFor(30000, () -> proc.isSuccess());

// make sure the peer is enabled again
assertTrue(UTIL.getAdmin().isReplicationPeerEnabled(peerId));
// make sure the replication log cleaner is enabled again
assertTrue(barrier.start());
barrier.stop();
}
}

0 comments on commit ff31d25

Please sign in to comment.