Skip to content

Commit

Permalink
HBASE-27213 Add support for claim queue operation (#4708)
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Sun <[email protected]>
  • Loading branch information
Apache9 authored Aug 20, 2022
1 parent 2151ead commit 9857458
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {

message RemovePeerStateData {
optional ReplicationPeer peer_config = 1;
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
}

message EnablePeerStateData {
Expand Down Expand Up @@ -711,9 +712,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
}

enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
}

message AssignReplicationQueuesStateData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
Expand Down Expand Up @@ -102,8 +103,12 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
}

private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
// filter out replication queue for deleted peers
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
.filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
if (queueIds.isEmpty()) {
LOG.debug("Finish claiming replication queues for {}", crashedServer);
// we are done
Expand All @@ -130,10 +135,6 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case ASSIGN_REPLICATION_QUEUES_PRE_CHECK:
// TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES);
return Flow.HAS_MORE_STATE;
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
addMissingQueues(env);
retryCounter = null;
Expand Down Expand Up @@ -183,7 +184,7 @@ protected int getStateId(AssignReplicationQueuesState state) {

@Override
protected AssignReplicationQueuesState getInitialState() {
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK;
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected abstract void prePeerModification(MasterProcedureEnv env)
* update the peer storage.
*/
protected abstract void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException;
throws IOException, ReplicationException, ProcedureSuspendedException;

protected void releaseLatch(MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(latch, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
package org.apache.hadoop.hbase.master.replication;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {

private ReplicationPeerConfig peerConfig;

private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();

public RemovePeerProcedure() {
}

Expand All @@ -64,15 +73,43 @@ protected void prePeerModification(MasterProcedureEnv env) throws IOException {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().removePeer(peerId);
// record ongoing AssignReplicationQueuesProcedures after we update the peer storage
ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
.getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
.filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
}

private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
}

private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
peerId);
}
ProcedureExecutor<MasterProcedureEnv> procExec =
env.getMasterServices().getMasterProcedureExecutor();
long[] unfinishedProcIds =
ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
.filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
if (unfinishedProcIds.length == 0) {
LOG.info(
"All assign replication queues procedures are finished when removing peer {}, move on",
peerId);
} else {
throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
"There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
}
}

@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
throws IOException, ReplicationException, ProcedureSuspendedException {
checkAssignReplicationQueuesFinished(env);

if (peerConfig.isSyncReplication()) {
removeRemoteWALs(env);
}
Expand All @@ -94,6 +131,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
if (peerConfig != null) {
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
}
builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
serializer.serialize(builder.build());
}

Expand All @@ -104,5 +142,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
if (data.hasPeerConfig()) {
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
}
ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
*/
void init() throws IOException {
for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id);
addSource(id, true);
}
}

Expand All @@ -256,7 +256,7 @@ public void addPeer(String peerId) throws IOException {
throw new IOException(e);
}
if (added) {
addSource(peerId);
addSource(peerId, false);
}
}

Expand Down Expand Up @@ -322,11 +322,16 @@ private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
/**
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
* group and do replication
* group and do replication.
* <p/>
* We add a {@code init} parameter to indicate whether this is part of the initialization process.
* If so, we should skip adding the replication queues as this may introduce dead lock on region
* server start up and hbase:replication table online.
* @param peerId the id of the replication peer
* @param init whether this call is part of the initialization process
* @return the source that was created
*/
void addSource(String peerId) throws IOException {
void addSource(String peerId, boolean init) throws IOException {
ReplicationPeer peer = replicationPeers.getPeer(peerId);
if (
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
Expand All @@ -351,11 +356,16 @@ void addSource(String peerId) throws IOException {
NavigableSet<String> wals = new TreeSet<>();
wals.add(walPath.getName());
walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
// TODO: can record the length of the current wal file so we could replicate less data
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
if (!init) {
// Abort RS and throw exception to make add peer failed
// Ideally we'd better use the current file size as offset so we can skip replicating
// the data before adding replication peer, but the problem is that the file may not end
// at a valid entry's ending, and the current WAL Reader implementation can not deal
// with reading from the middle of a WAL entry. Can improve later.
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
}
src.enqueueLog(walPath);
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
}
Expand Down Expand Up @@ -792,9 +802,15 @@ public void postLogRoll(Path newLog) throws IOException {
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
*/
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
// skip replicating meta wals
if (AbstractFSWALProvider.isMetaFile(wal)) {
return false;
}
// if no offset or the offset is just a place marker, replicate
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
return true;
}
// otherwise, compare the timestamp
long walTs = AbstractFSWALProvider.getTimestamp(wal);
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
if (walTs < startWalTs) {
Expand Down Expand Up @@ -889,7 +905,6 @@ Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getN
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
groupOffset);
}
walFilesPQ.add(file);
}
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
// mistake, we should use the sorted walFilesPQ instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testClaim() throws Exception {
hbaseAdmin.enableReplicationPeer(PEER_ID3);

EMPTY = false;
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));

Expand Down
Loading

0 comments on commit 9857458

Please sign in to comment.