Skip to content

Commit

Permalink
HBASE-28420 remove succ update from child and add stats in SnapshotVe…
Browse files Browse the repository at this point in the history
…rifyProcedure
  • Loading branch information
ukumawat committed May 30, 2024
1 parent a586219 commit 85b4d6a
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,20 @@ public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
protected ProcedureEvent<?> event;
protected ServerName targetServer;
protected boolean dispatched;
protected boolean succ;
// after remoteProcedureDone we require error field to decide the next state
protected Throwable remoteError;
protected MasterProcedureProtos.ServerRemoteProcedureState state =
MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;

protected abstract void complete(MasterProcedureEnv env, Throwable error);
protected abstract boolean complete(MasterProcedureEnv env, Throwable error);

@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (
state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
complete(env, this.remoteError);
if (succ) {
if (complete(env, this.remoteError)) {
return null;
}
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
Expand All @@ -108,7 +105,6 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
+ "be retried to send to another server", this.getProcId(), targetServer);
return null;
}
dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
Expand Down Expand Up @@ -77,31 +78,28 @@ protected boolean abort(MasterProcedureEnv env) {
}

@Override
protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) {
boolean isProcedureCompleted = false;
try {
if (error != null) {
if (error instanceof RemoteProcedureException) {
// remote operation failed
Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error);
if (remoteEx instanceof CorruptedSnapshotException) {
// snapshot is corrupted, will touch a flag file and finish the procedure
succ = true;
isProcedureCompleted = true;
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null) {
parent.markSnapshotCorrupted();
}
} else {
// unexpected exception in remote server, will retry on other servers
succ = false;
}
} else {
// the mostly like thing is that remote call failed, will retry on other servers
succ = false;
}
} // else unexpected exception in remote server, will retry on other servers,
// procedureCompleted will stay false
} // else the mostly like thing is that remote call failed, will retry on other servers,
// procedureCompleted will stay false
} else {
// remote operation finished without error
succ = true;
isProcedureCompleted = true;
}
} catch (IOException e) {
// if we can't create the flag file, then mark the current procedure as FAILED
Expand All @@ -114,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
}
return isProcedureCompleted;
}

// we will wrap remote exception into a RemoteProcedureException,
Expand All @@ -128,22 +127,29 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
try {
// if we've already known the snapshot is corrupted, then stop scheduling
// the new procedures and the undispatched procedures
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null && parent.isSnapshotCorrupted()) {
return null;
}
}
// acquire a worker
if (!dispatched && targetServer == null) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
&& targetServer == null
) {
targetServer =
env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this);
}
// send remote request
Procedure<MasterProcedureEnv>[] res = super.execute(env);
// retry if necessary
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
// the mostly like thing is that a FailedRemoteDispatchException is thrown.
// we need to retry on another remote server
targetServer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,21 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
succ = true;
return true;
} else {
if (error instanceof DoNotRetryIOException) {
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
succ = true;
return true;
} else {
LOG.warn("Failed split of {}, retry...", walPath, error);
succ = false;
return false;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ public ServerOperationType getServerOperationType() {
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
error);
this.succ = false;
return false;
} else {
this.succ = true;
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ public ServerOperationType getServerOperationType() {
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error);
this.succ = false;
return false;
} else {
this.succ = true;
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
this.succ = false;
return false;
} else {
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
this.succ = true;
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN
ReplaySyncReplicationWALCallable.class, builder.build().toByteArray()));
}

protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error);
this.succ = false;
return false;
} else {
truncateWALs(env);
LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId);
this.succ = true;
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ public synchronized void remoteOperationFailed(MasterProcedureEnv env,
}

@Override
public void complete(MasterProcedureEnv env, Throwable error) {
this.succ = true;
return;
public boolean complete(MasterProcedureEnv env, Throwable error) {
return true;
}

@Override
Expand Down

0 comments on commit 85b4d6a

Please sign in to comment.