diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 2e0da0deb842..76a1d676487a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -515,6 +515,7 @@ message UpdatePeerConfigStateData { message RemovePeerStateData { optional ReplicationPeer peer_config = 1; + repeated int64 ongoing_assign_replication_queues_proc_ids = 2; } message EnablePeerStateData { @@ -714,9 +715,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData { } enum AssignReplicationQueuesState { - ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1; - ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2; - ASSIGN_REPLICATION_QUEUES_CLAIM = 3; + ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1; + ASSIGN_REPLICATION_QUEUES_CLAIM = 2; } message AssignReplicationQueuesStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java index e7fb5e517159..d33259dd4368 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AssignReplicationQueuesProcedure.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; @@ -102,8 +103,12 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio } private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException { + Set existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream() + .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet()); ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage(); - List queueIds = storage.listAllQueueIds(crashedServer); + // filter out replication queue for deleted peers + List queueIds = storage.listAllQueueIds(crashedServer).stream() + .filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList()); if (queueIds.isEmpty()) { LOG.debug("Finish claiming replication queues for {}", crashedServer); // we are done @@ -130,10 +135,6 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { try { switch (state) { - case ASSIGN_REPLICATION_QUEUES_PRE_CHECK: - // TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure - setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES); - return Flow.HAS_MORE_STATE; case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES: addMissingQueues(env); retryCounter = null; @@ -183,7 +184,7 @@ protected int getStateId(AssignReplicationQueuesState state) { @Override protected AssignReplicationQueuesState getInitialState() { - return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK; + return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 3af902e1d8a4..50214e205192 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -74,7 +74,7 @@ protected abstract void prePeerModification(MasterProcedureEnv env) * update the peer storage. */ protected abstract void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException; + throws IOException, ReplicationException, ProcedureSuspendedException; protected void releaseLatch(MasterProcedureEnv env) { ProcedurePrepareLatch.releaseLatch(latch, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java index 2042e8468497..2fadc3fd6642 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RemovePeerProcedure.java @@ -18,10 +18,17 @@ package org.apache.hadoop.hbase.master.replication; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.yetus.audience.InterfaceAudience; @@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure { private ReplicationPeerConfig peerConfig; + private List ongoingAssignReplicationQueuesProcIds = Collections.emptyList(); + public RemovePeerProcedure() { } @@ -64,15 +73,43 @@ protected void prePeerModification(MasterProcedureEnv env) throws IOException { @Override protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException { env.getReplicationPeerManager().removePeer(peerId); + // record ongoing AssignReplicationQueuesProcedures after we update the peer storage + ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor() + .getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure) + .filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList()); } private void removeRemoteWALs(MasterProcedureEnv env) throws IOException { env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId); } + private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env) + throws ProcedureSuspendedException { + if (ongoingAssignReplicationQueuesProcIds.isEmpty()) { + LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on", + peerId); + } + ProcedureExecutor procExec = + env.getMasterServices().getMasterProcedureExecutor(); + long[] unfinishedProcIds = + ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure) + .filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray(); + if (unfinishedProcIds.length == 0) { + LOG.info( + "All assign replication queues procedures are finished when removing peer {}, move on", + peerId); + } else { + throw suspend(env.getMasterConfiguration(), backoff -> LOG.info( + "There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs", + unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000)); + } + } + @Override protected void postPeerModification(MasterProcedureEnv env) - throws IOException, ReplicationException { + throws IOException, ReplicationException, ProcedureSuspendedException { + checkAssignReplicationQueuesFinished(env); + if (peerConfig.isSyncReplication()) { removeRemoteWALs(env); } @@ -94,6 +131,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO if (peerConfig != null) { builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); } + builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds); serializer.serialize(builder.build()); } @@ -104,5 +142,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws if (data.hasPeerConfig()) { this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig()); } + ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index c16ba8b133c6..5d77600a187b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -237,7 +237,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage, */ void init() throws IOException { for (String id : this.replicationPeers.getAllPeerIds()) { - addSource(id); + addSource(id, true); } } @@ -257,7 +257,7 @@ public void addPeer(String peerId) throws IOException { throw new IOException(e); } if (added) { - addSource(peerId); + addSource(peerId, false); } } @@ -323,11 +323,16 @@ private ReplicationSourceInterface createSource(ReplicationQueueData queueData, /** * Add a normal source for the given peer on this region server. Meanwhile, add new replication * queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal - * group and do replication + * group and do replication. + *

+ * We add a {@code init} parameter to indicate whether this is part of the initialization process. + * If so, we should skip adding the replication queues as this may introduce dead lock on region + * server start up and hbase:replication table online. * @param peerId the id of the replication peer + * @param init whether this call is part of the initialization process * @return the source that was created */ - void addSource(String peerId) throws IOException { + void addSource(String peerId, boolean init) throws IOException { ReplicationPeer peer = replicationPeers.getPeer(peerId); if ( ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME @@ -352,11 +357,16 @@ void addSource(String peerId) throws IOException { NavigableSet wals = new TreeSet<>(); wals.add(walPath.getName()); walsByGroup.put(walPrefixAndPath.getKey(), wals); - // Abort RS and throw exception to make add peer failed - // TODO: can record the length of the current wal file so we could replicate less data - abortAndThrowIOExceptionWhenFail( - () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), - new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); + if (!init) { + // Abort RS and throw exception to make add peer failed + // Ideally we'd better use the current file size as offset so we can skip replicating + // the data before adding replication peer, but the problem is that the file may not end + // at a valid entry's ending, and the current WAL Reader implementation can not deal + // with reading from the middle of a WAL entry. Can improve later. + abortAndThrowIOExceptionWhenFail( + () -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(), + new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap())); + } src.enqueueLog(walPath); LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId()); } @@ -795,9 +805,15 @@ public void postLogRoll(Path newLog) throws IOException { * @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}. */ private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) { - if (offset == null || offset == ReplicationGroupOffset.BEGIN) { + // skip replicating meta wals + if (AbstractFSWALProvider.isMetaFile(wal)) { return false; } + // if no offset or the offset is just a place marker, replicate + if (offset == null || offset == ReplicationGroupOffset.BEGIN) { + return true; + } + // otherwise, compare the timestamp long walTs = AbstractFSWALProvider.getTimestamp(wal); long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal()); if (walTs < startWalTs) { @@ -892,7 +908,6 @@ Comparator. comparing(p -> AbstractFSWALProvider.getTimestamp(p.getN LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(), groupOffset); } - walFilesPQ.add(file); } // the method is a bit long, so assign it to null here to avoid later we reuse it again by // mistake, we should use the sorted walFilesPQ instead diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java index a12081a76363..de226b13e8fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestClaimReplicationQueue.java @@ -156,7 +156,7 @@ public void testClaim() throws Exception { hbaseAdmin.enableReplicationPeer(PEER_ID3); EMPTY = false; - // wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP + // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP UTIL1.waitFor(30000, () -> master.getProcedures().stream() .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java new file mode 100644 index 000000000000..e93fa3b01e87 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestRemovePeerProcedureWaitForSCP.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionServerList; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.replication.AssignReplicationQueuesProcedure; +import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + +/** + * Make sure we will wait until all the SCPs finished in RemovePeerProcedure. + *

+ * See HBASE-27109 for more details. + */ +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestRemovePeerProcedureWaitForSCP extends TestReplicationBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemovePeerProcedureWaitForSCP.class); + + private static final TableName tableName3 = TableName.valueOf("test3"); + + private static final String PEER_ID3 = "3"; + + private static Table table3; + + private static volatile boolean EMPTY = false; + + public static final class ServerManagerForTest extends ServerManager { + + public ServerManagerForTest(MasterServices master, RegionServerList storage) { + super(master, storage); + } + + @Override + public List getOnlineServersList() { + // return no region server to make the procedure hang + if (EMPTY) { + for (StackTraceElement e : Thread.currentThread().getStackTrace()) { + if (e.getClassName().equals(AssignReplicationQueuesProcedure.class.getName())) { + return Collections.emptyList(); + } + } + } + return super.getOnlineServersList(); + } + } + + public static final class HMasterForTest extends HMaster { + + public HMasterForTest(Configuration conf) throws IOException { + super(conf); + } + + @Override + protected ServerManager createServerManager(MasterServices master, RegionServerList storage) + throws IOException { + setupClusterConnection(); + return new ServerManagerForTest(master, storage); + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + CONF1.setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class); + TestReplicationBase.setUpBeforeClass(); + createTable(tableName3); + table3 = connection1.getTable(tableName3); + } + + @Override + public void setUpBase() throws Exception { + super.setUpBase(); + // set up two replication peers and only 1 rs to test claim replication queue with multiple + // round + addPeer(PEER_ID3, tableName3); + } + + @Override + public void tearDownBase() throws Exception { + super.tearDownBase(); + removePeer(PEER_ID3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + Closeables.close(table3, true); + TestReplicationBase.tearDownAfterClass(); + } + + @Test + public void testWait() throws Exception { + // disable the peers + hbaseAdmin.disableReplicationPeer(PEER_ID2); + hbaseAdmin.disableReplicationPeer(PEER_ID3); + + // put some data + UTIL1.loadTable(htable1, famName); + UTIL1.loadTable(table3, famName); + + EMPTY = true; + UTIL1.getMiniHBaseCluster().stopRegionServer(0).join(); + UTIL1.getMiniHBaseCluster().startRegionServer(); + + // since there is no active region server to get the replication queue, the procedure should be + // in WAITING_TIMEOUT state for most time to retry + HMaster master = UTIL1.getMiniHBaseCluster().getMaster(); + UTIL1.waitFor(30000, + () -> master.getProcedures().stream() + .filter(p -> p instanceof AssignReplicationQueuesProcedure) + .anyMatch(p -> p.getState() == ProcedureState.WAITING_TIMEOUT)); + + // call remove replication peer, and make sure it will be stuck in the POST_PEER_MODIFICATION + // state. + hbaseAdmin.removeReplicationPeerAsync(PEER_ID3); + UTIL1.waitFor(30000, + () -> master.getProcedures().stream().filter(p -> p instanceof RemovePeerProcedure) + .anyMatch(p -> ((RemovePeerProcedure) p).getCurrentStateId() + == PeerModificationState.POST_PEER_MODIFICATION_VALUE)); + Thread.sleep(5000); + assertEquals(PeerModificationState.POST_PEER_MODIFICATION_VALUE, + ((RemovePeerProcedure) master.getProcedures().stream() + .filter(p -> p instanceof RemovePeerProcedure).findFirst().get()).getCurrentStateId()); + EMPTY = false; + // wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP + UTIL1.waitFor(30000, () -> master.getProcedures().stream() + .filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess)); + // the RemovePeerProcedure should have also finished + UTIL1.waitFor(30000, () -> master.getProcedures().stream() + .filter(p -> p instanceof RemovePeerProcedure).allMatch(Procedure::isSuccess)); + // make sure there is no remaining replication queues for PEER_ID3 + assertThat(master.getReplicationPeerManager().getQueueStorage().listAllQueueIds(PEER_ID3), + empty()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java index 6906db4cd466..1295ea14abcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplicationFailover.java @@ -32,12 +32,9 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -// revisit later when we reviewing the implementation for serial replication -@Ignore @Category({ ReplicationTests.class, MediumTests.class }) public class TestSerialReplicationFailover extends SerialReplicationTestBase {