Skip to content

Commit

Permalink
HBASE-28420 Update the procedure's field to store for ServerRemotePro…
Browse files Browse the repository at this point in the history
…cedure (#5816)

Co-authored-by: ukumawat <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
Umeshkumar9414 and ukumawat committed Jun 3, 2024
1 parent 7869f7f commit e6d880b
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 58 deletions.
19 changes: 19 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import "RPC.proto";
import "Snapshot.proto";
import "Replication.proto";
import "RegionServerStatus.proto";
import "ErrorHandling.proto";

// ============================================================================
// WARNING - Compatibility rules
Expand Down Expand Up @@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
optional ServerName target_server = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

message SnapshotVerifyParameter {
Expand Down Expand Up @@ -503,6 +506,8 @@ message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message RefreshPeerParameter {
Expand Down Expand Up @@ -586,6 +591,14 @@ enum RegionRemoteProcedureBaseState {
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
}

enum ServerRemoteProcedureState {
SERVER_REMOTE_PROCEDURE_DISPATCH = 1;
SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2;
SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3;
SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4;
SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5;
}

message RegionRemoteProcedureBaseStateData {
required RegionInfo region = 1;
required ServerName target_server = 2;
Expand Down Expand Up @@ -617,6 +630,8 @@ message SwitchRpcThrottleStateData {
message SwitchRpcThrottleRemoteStateData {
required ServerName target_server = 1;
required bool rpc_throttle_enabled = 2;
optional ServerRemoteProcedureState state = 3;
optional ForeignExceptionMessage error = 4;
}

message SplitWALParameter {
Expand All @@ -634,6 +649,8 @@ message SplitWALRemoteData {
required string wal_path = 1;
required ServerName crashed_server = 2;
required ServerName worker = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

enum SplitWALState {
Expand All @@ -650,6 +667,8 @@ message ClaimReplicationQueueRemoteStateData {
required ServerName crashed_server = 1;
required string queue = 2;
required ServerName target_server = 3;
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message ClaimReplicationQueueRemoteParameter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

@InterfaceAudience.Private
/**
* The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote)
Expand Down Expand Up @@ -63,34 +65,38 @@
* <p>
* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle
* this which calls remoteOperationDone with the exception. If the targetServer crashed but this
* procedure has no response, than dispatcher will call remoteOperationFailed() which also calls
* remoteOperationDone with the exception. If the operation is successful, then
* remoteOperationCompleted will be called and actually calls the remoteOperationDone without
* exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others.
* Then developer could implement complete() based on their own purpose. But basic logic is that if
* operation succeed, set succ to true and do the clean work. If operation failed and require to
* resend it to the same server, leave the succ as false. If operation failed and require to resend
* it to another server, set succ to true and upper layer should be able to find out this operation
* not work and send a operation to another server.
* procedure has no response or if we receive failed response, then dispatcher will call
* remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation
* is successful, then remoteOperationCompleted will be called and actually calls the
* remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is
* already get wake up by others. Then developer could implement complete() based on their own
* purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If
* operation failed and require to resend it to the same server, leave the succ as false. If
* operation failed and require to resend it to another server, set succ to true and upper layer
* should be able to find out this operation not work and send a operation to another server.
*/
public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
protected ProcedureEvent<?> event;
protected ServerName targetServer;
protected boolean dispatched;
protected boolean succ;
// after remoteProcedureDone we require error field to decide the next state
protected Throwable remoteError;
protected MasterProcedureProtos.ServerRemoteProcedureState state =
MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;

protected abstract void complete(MasterProcedureEnv env, Throwable error);
protected abstract boolean complete(MasterProcedureEnv env, Throwable error);

@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (dispatched) {
if (succ) {
if (
state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
if (complete(env, this.remoteError)) {
return null;
}
dispatched = false;
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
}
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
Expand All @@ -99,7 +105,6 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
+ "be retried to send to another server", this.getProcId(), targetServer);
return null;
}
dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
Expand All @@ -113,17 +118,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) {
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
IOException exception) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL;
remoteOperationDone(env, exception);
}

@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
remoteOperationDone(env, null);
}

@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED;
remoteOperationDone(env, error);
}

Expand All @@ -137,7 +145,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
getProcId());
return;
}
complete(env, error);
this.remoteError = error;
// below persistence is added so that if report goes to last active master, it throws exception
env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
event.wake(env.getProcedureScheduler());
event = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
Expand Down Expand Up @@ -75,31 +78,28 @@ protected boolean abort(MasterProcedureEnv env) {
}

@Override
protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) {
boolean isProcedureCompleted = false;
try {
if (error != null) {
if (error instanceof RemoteProcedureException) {
// remote operation failed
Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error);
if (remoteEx instanceof CorruptedSnapshotException) {
// snapshot is corrupted, will touch a flag file and finish the procedure
succ = true;
isProcedureCompleted = true;
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null) {
parent.markSnapshotCorrupted();
}
} else {
// unexpected exception in remote server, will retry on other servers
succ = false;
}
} else {
// the mostly like thing is that remote call failed, will retry on other servers
succ = false;
}
} // else unexpected exception in remote server, will retry on other servers,
// procedureCompleted will stay false
} // else the mostly like thing is that remote call failed, will retry on other servers,
// procedureCompleted will stay false
} else {
// remote operation finished without error
succ = true;
isProcedureCompleted = true;
}
} catch (IOException e) {
// if we can't create the flag file, then mark the current procedure as FAILED
Expand All @@ -112,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
}
return isProcedureCompleted;
}

// we will wrap remote exception into a RemoteProcedureException,
Expand All @@ -126,22 +127,29 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
try {
// if we've already known the snapshot is corrupted, then stop scheduling
// the new procedures and the undispatched procedures
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null && parent.isSnapshotCorrupted()) {
return null;
}
}
// acquire a worker
if (!dispatched && targetServer == null) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
&& targetServer == null
) {
targetServer =
env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this);
}
// send remote request
Procedure<MasterProcedureEnv>[] res = super.execute(env);
// retry if necessary
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
// the mostly like thing is that a FailedRemoteDispatchException is thrown.
// we need to retry on another remote server
targetServer = null;
Expand Down Expand Up @@ -177,10 +185,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SnapshotVerifyProcedureStateData.Builder builder =
SnapshotVerifyProcedureStateData.newBuilder();
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state);
if (targetServer != null) {
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
}
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -190,9 +203,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SnapshotVerifyProcedureStateData.class);
this.snapshot = data.getSnapshot();
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
this.state = data.getState();
if (data.hasTargetServer()) {
this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

/**
Expand Down Expand Up @@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state);
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
walPath = data.getWalPath();
targetServer = ProtobufUtil.toServerName(data.getWorker());
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
state = data.getState();
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand All @@ -92,21 +103,21 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
succ = true;
return true;
} else {
if (error instanceof DoNotRetryIOException) {
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
succ = true;
return true;
} else {
LOG.warn("Failed split of {}, retry...", walPath, error);
succ = false;
return false;
}
}
}
Expand Down
Loading

0 comments on commit e6d880b

Please sign in to comment.