Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27213 Add support for claim queue operation #4708

Merged
merged 1 commit into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ message UpdatePeerConfigStateData {

message RemovePeerStateData {
optional ReplicationPeer peer_config = 1;
repeated int64 ongoing_assign_replication_queues_proc_ids = 2;
}

message EnablePeerStateData {
Expand Down Expand Up @@ -711,9 +712,8 @@ message ModifyColumnFamilyStoreFileTrackerStateData {
}

enum AssignReplicationQueuesState {
ASSIGN_REPLICATION_QUEUES_PRE_CHECK = 1;
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 2;
ASSIGN_REPLICATION_QUEUES_CLAIM = 3;
ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES = 1;
ASSIGN_REPLICATION_QUEUES_CLAIM = 2;
}

message AssignReplicationQueuesStateData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
Expand Down Expand Up @@ -102,8 +103,12 @@ private void addMissingQueues(MasterProcedureEnv env) throws ReplicationExceptio
}

private Flow claimQueues(MasterProcedureEnv env) throws ReplicationException {
Set<String> existingPeerIds = env.getReplicationPeerManager().listPeers(null).stream()
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toSet());
ReplicationQueueStorage storage = env.getReplicationPeerManager().getQueueStorage();
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer);
// filter out replication queue for deleted peers
List<ReplicationQueueId> queueIds = storage.listAllQueueIds(crashedServer).stream()
.filter(q -> existingPeerIds.contains(q.getPeerId())).collect(Collectors.toList());
if (queueIds.isEmpty()) {
LOG.debug("Finish claiming replication queues for {}", crashedServer);
// we are done
Expand All @@ -130,10 +135,6 @@ protected Flow executeFromState(MasterProcedureEnv env, AssignReplicationQueuesS
throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
try {
switch (state) {
case ASSIGN_REPLICATION_QUEUES_PRE_CHECK:
// TODO: reserved for implementing the fencing logic with Add/Remove/UpdatePeerProcedure
setNextState(AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES);
return Flow.HAS_MORE_STATE;
case ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES:
addMissingQueues(env);
retryCounter = null;
Expand Down Expand Up @@ -183,7 +184,7 @@ protected int getStateId(AssignReplicationQueuesState state) {

@Override
protected AssignReplicationQueuesState getInitialState() {
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_PRE_CHECK;
return AssignReplicationQueuesState.ASSIGN_REPLICATION_QUEUES_ADD_MISSING_QUEUES;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected abstract void prePeerModification(MasterProcedureEnv env)
* update the peer storage.
*/
protected abstract void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException;
throws IOException, ReplicationException, ProcedureSuspendedException;

protected void releaseLatch(MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(latch, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
package org.apache.hadoop.hbase.master.replication;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -40,6 +47,8 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {

private ReplicationPeerConfig peerConfig;

private List<Long> ongoingAssignReplicationQueuesProcIds = Collections.emptyList();

public RemovePeerProcedure() {
}

Expand All @@ -64,15 +73,43 @@ protected void prePeerModification(MasterProcedureEnv env) throws IOException {
@Override
protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().removePeer(peerId);
// record ongoing AssignReplicationQueuesProcedures after we update the peer storage
ongoingAssignReplicationQueuesProcIds = env.getMasterServices().getMasterProcedureExecutor()
.getProcedures().stream().filter(p -> p instanceof AssignReplicationQueuesProcedure)
.filter(p -> !p.isFinished()).map(Procedure::getProcId).collect(Collectors.toList());
}

private void removeRemoteWALs(MasterProcedureEnv env) throws IOException {
env.getMasterServices().getSyncReplicationReplayWALManager().removePeerRemoteWALs(peerId);
}

private void checkAssignReplicationQueuesFinished(MasterProcedureEnv env)
throws ProcedureSuspendedException {
if (ongoingAssignReplicationQueuesProcIds.isEmpty()) {
LOG.info("No ongoing assign replication queues procedures when removing peer {}, move on",
peerId);
}
ProcedureExecutor<MasterProcedureEnv> procExec =
env.getMasterServices().getMasterProcedureExecutor();
long[] unfinishedProcIds =
ongoingAssignReplicationQueuesProcIds.stream().map(procExec::getProcedure)
.filter(p -> p != null && !p.isFinished()).mapToLong(Procedure::getProcId).toArray();
if (unfinishedProcIds.length == 0) {
LOG.info(
"All assign replication queues procedures are finished when removing peer {}, move on",
peerId);
} else {
throw suspend(env.getMasterConfiguration(), backoff -> LOG.info(
"There are still {} pending assign replication queues procedures {} when removing peer {}, sleep {} secs",
unfinishedProcIds.length, Arrays.toString(unfinishedProcIds), peerId, backoff / 1000));
}
}

@Override
protected void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException {
throws IOException, ReplicationException, ProcedureSuspendedException {
checkAssignReplicationQueuesFinished(env);

if (peerConfig.isSyncReplication()) {
removeRemoteWALs(env);
}
Expand All @@ -94,6 +131,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
if (peerConfig != null) {
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
}
builder.addAllOngoingAssignReplicationQueuesProcIds(ongoingAssignReplicationQueuesProcIds);
serializer.serialize(builder.build());
}

Expand All @@ -104,5 +142,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
if (data.hasPeerConfig()) {
this.peerConfig = ReplicationPeerConfigUtil.convert(data.getPeerConfig());
}
ongoingAssignReplicationQueuesProcIds = data.getOngoingAssignReplicationQueuesProcIdsList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
*/
void init() throws IOException {
for (String id : this.replicationPeers.getAllPeerIds()) {
addSource(id);
addSource(id, true);
}
}

Expand All @@ -256,7 +256,7 @@ public void addPeer(String peerId) throws IOException {
throw new IOException(e);
}
if (added) {
addSource(peerId);
addSource(peerId, false);
}
}

Expand Down Expand Up @@ -322,11 +322,16 @@ private ReplicationSourceInterface createSource(ReplicationQueueData queueData,
/**
* Add a normal source for the given peer on this region server. Meanwhile, add new replication
* queue to storage. For the newly added peer, we only need to enqueue the latest log of each wal
* group and do replication
* group and do replication.
* <p/>
* We add a {@code init} parameter to indicate whether this is part of the initialization process.
* If so, we should skip adding the replication queues as this may introduce dead lock on region
* server start up and hbase:replication table online.
* @param peerId the id of the replication peer
* @param init whether this call is part of the initialization process
* @return the source that was created
*/
void addSource(String peerId) throws IOException {
void addSource(String peerId, boolean init) throws IOException {
ReplicationPeer peer = replicationPeers.getPeer(peerId);
if (
ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME
Expand All @@ -351,11 +356,16 @@ void addSource(String peerId) throws IOException {
NavigableSet<String> wals = new TreeSet<>();
wals.add(walPath.getName());
walsByGroup.put(walPrefixAndPath.getKey(), wals);
// Abort RS and throw exception to make add peer failed
// TODO: can record the length of the current wal file so we could replicate less data
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
if (!init) {
// Abort RS and throw exception to make add peer failed
// Ideally we'd better use the current file size as offset so we can skip replicating
// the data before adding replication peer, but the problem is that the file may not end
// at a valid entry's ending, and the current WAL Reader implementation can not deal
// with reading from the middle of a WAL entry. Can improve later.
abortAndThrowIOExceptionWhenFail(
() -> this.queueStorage.setOffset(queueId, walPrefixAndPath.getKey(),
new ReplicationGroupOffset(walPath.getName(), 0), Collections.emptyMap()));
}
src.enqueueLog(walPath);
LOG.trace("Enqueued {} to source {} during source creation.", walPath, src.getQueueId());
}
Expand Down Expand Up @@ -792,9 +802,15 @@ public void postLogRoll(Path newLog) throws IOException {
* @return {@code true} means we should replicate the given {@code wal}, otherwise {@code false}.
*/
private boolean shouldReplicate(ReplicationGroupOffset offset, String wal) {
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
// skip replicating meta wals
if (AbstractFSWALProvider.isMetaFile(wal)) {
return false;
}
// if no offset or the offset is just a place marker, replicate
if (offset == null || offset == ReplicationGroupOffset.BEGIN) {
return true;
}
// otherwise, compare the timestamp
long walTs = AbstractFSWALProvider.getTimestamp(wal);
long startWalTs = AbstractFSWALProvider.getTimestamp(offset.getWal());
if (walTs < startWalTs) {
Expand Down Expand Up @@ -889,7 +905,6 @@ Comparator.<Path, Long> comparing(p -> AbstractFSWALProvider.getTimestamp(p.getN
LOG.debug("Skip enqueuing log {} because it is before the start offset {}", file.getName(),
groupOffset);
}
walFilesPQ.add(file);
}
// the method is a bit long, so assign it to null here to avoid later we reuse it again by
// mistake, we should use the sorted walFilesPQ instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testClaim() throws Exception {
hbaseAdmin.enableReplicationPeer(PEER_ID3);

EMPTY = false;
// wait until the SCP finished, ClaimReplicationQueuesProcedure is a sub procedure of SCP
// wait until the SCP finished, AssignReplicationQueuesProcedure is a sub procedure of SCP
UTIL1.waitFor(30000, () -> master.getProcedures().stream()
.filter(p -> p instanceof ServerCrashProcedure).allMatch(Procedure::isSuccess));

Expand Down
Loading