Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28420 Update the procedure's field to store for ServerRemoteProcedure #5816

Merged
merged 8 commits into from
May 31, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import "rpc/RPC.proto";
import "server/Snapshot.proto";
import "server/master/Replication.proto";
import "server/master/RegionServerStatus.proto";
import "server/ErrorHandling.proto";

// ============================================================================
// WARNING - Compatibility rules
Expand Down Expand Up @@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
optional ServerName target_server = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

message SnapshotVerifyParameter {
Expand Down Expand Up @@ -522,6 +525,8 @@ message RefreshPeerStateData {
required ServerName target_server = 3;
/** We need multiple stages for sync replication state transition **/
optional uint32 stage = 4 [default = 0];
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message RefreshPeerParameter {
Expand Down Expand Up @@ -613,6 +618,8 @@ message SyncReplicationReplayWALRemoteStateData {
required string peer_id = 1;
repeated string wal = 2;
required ServerName target_server = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

message ReplaySyncReplicationWALParameter {
Expand Down Expand Up @@ -650,6 +657,14 @@ enum RegionRemoteProcedureBaseState {
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
}

enum ServerRemoteProcedureState {
SERVER_REMOTE_PROCEDURE_DISPATCH = 1;
SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2;
SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3;
SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4;
SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5;
}

message RegionRemoteProcedureBaseStateData {
required RegionInfo region = 1;
required ServerName target_server = 2;
Expand Down Expand Up @@ -681,6 +696,8 @@ message SwitchRpcThrottleStateData {
message SwitchRpcThrottleRemoteStateData {
required ServerName target_server = 1;
required bool rpc_throttle_enabled = 2;
optional ServerRemoteProcedureState state = 3;
optional ForeignExceptionMessage error = 4;
}

message SplitWALParameter {
Expand All @@ -698,6 +715,8 @@ message SplitWALRemoteData{
required string wal_path = 1;
required ServerName crashed_server=2;
required ServerName worker = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

enum SplitWALState{
Expand All @@ -715,6 +734,8 @@ message ClaimReplicationQueueRemoteStateData {
required string queue = 2;
required ServerName target_server = 3;
optional ServerName source_server = 4;
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message ClaimReplicationQueueRemoteParameter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

@InterfaceAudience.Private
/**
* The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote)
Expand Down Expand Up @@ -63,34 +65,38 @@
* <p>
* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle
* this which calls remoteOperationDone with the exception. If the targetServer crashed but this
* procedure has no response, than dispatcher will call remoteOperationFailed() which also calls
* remoteOperationDone with the exception. If the operation is successful, then
* remoteOperationCompleted will be called and actually calls the remoteOperationDone without
* exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others.
* Then developer could implement complete() based on their own purpose. But basic logic is that if
* operation succeed, set succ to true and do the clean work. If operation failed and require to
* resend it to the same server, leave the succ as false. If operation failed and require to resend
* it to another server, set succ to true and upper layer should be able to find out this operation
* not work and send a operation to another server.
* procedure has no response or if we receive failed response, then dispatcher will call
* remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation
* is successful, then remoteOperationCompleted will be called and actually calls the
* remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is
* already get wake up by others. Then developer could implement complete() based on their own
* purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If
* operation failed and require to resend it to the same server, leave the succ as false. If
* operation failed and require to resend it to another server, set succ to true and upper layer
* should be able to find out this operation not work and send a operation to another server.
*/
public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv>
implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> {
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
protected ProcedureEvent<?> event;
protected ServerName targetServer;
protected boolean dispatched;
protected boolean succ;
// after remoteProcedureDone we require error field to decide the next state
protected Throwable remoteError;
protected MasterProcedureProtos.ServerRemoteProcedureState state =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am using state for making decisions then I think we need to handle HMaster restart scenarios while HBase upgrade.

MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;

protected abstract void complete(MasterProcedureEnv env, Throwable error);
protected abstract boolean complete(MasterProcedureEnv env, Throwable error);

@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (dispatched) {
if (succ) {
if (
state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
if (complete(env, this.remoteError)) {
return null;
}
dispatched = false;
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
}
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
Expand All @@ -99,7 +105,6 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
+ "be retried to send to another server", this.getProcId(), targetServer);
return null;
}
dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
Expand All @@ -113,17 +118,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) {
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
IOException exception) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL;
remoteOperationDone(env, exception);
}

@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
remoteOperationDone(env, null);
}

@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED;
remoteOperationDone(env, error);
}

Expand All @@ -137,7 +145,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
getProcId());
return;
}
complete(env, error);
this.remoteError = error;
// below persistence is added so that if report goes to last active master, it throws exception
env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
event.wake(env.getProcedureScheduler());
event = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
Expand Down Expand Up @@ -75,31 +78,28 @@ protected boolean abort(MasterProcedureEnv env) {
}

@Override
protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) {
boolean isProcedureCompleted = false;
try {
if (error != null) {
if (error instanceof RemoteProcedureException) {
// remote operation failed
Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error);
if (remoteEx instanceof CorruptedSnapshotException) {
// snapshot is corrupted, will touch a flag file and finish the procedure
succ = true;
isProcedureCompleted = true;
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null) {
parent.markSnapshotCorrupted();
}
} else {
// unexpected exception in remote server, will retry on other servers
succ = false;
}
} else {
// the mostly like thing is that remote call failed, will retry on other servers
succ = false;
}
} // else unexpected exception in remote server, will retry on other servers,
// procedureCompleted will stay false
} // else the mostly like thing is that remote call failed, will retry on other servers,
// procedureCompleted will stay false
} else {
// remote operation finished without error
succ = true;
isProcedureCompleted = true;
}
} catch (IOException e) {
// if we can't create the flag file, then mark the current procedure as FAILED
Expand All @@ -112,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
}
return isProcedureCompleted;
}

// we will wrap remote exception into a RemoteProcedureException,
Expand All @@ -126,22 +127,29 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn
try {
// if we've already known the snapshot is corrupted, then stop scheduling
// the new procedures and the undispatched procedures
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null && parent.isSnapshotCorrupted()) {
return null;
}
}
// acquire a worker
if (!dispatched && targetServer == null) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
&& targetServer == null
) {
targetServer =
env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this);
}
// send remote request
Procedure<MasterProcedureEnv>[] res = super.execute(env);
// retry if necessary
if (!dispatched) {
if (
state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
// the mostly like thing is that a FailedRemoteDispatchException is thrown.
// we need to retry on another remote server
targetServer = null;
Expand Down Expand Up @@ -177,10 +185,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SnapshotVerifyProcedureStateData.Builder builder =
SnapshotVerifyProcedureStateData.newBuilder();
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state);
if (targetServer != null) {
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
}
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -190,9 +203,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SnapshotVerifyProcedureStateData.class);
this.snapshot = data.getSnapshot();
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
this.state = data.getState();
if (data.hasTargetServer()) {
this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

/**
Expand Down Expand Up @@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state);
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
walPath = data.getWalPath();
targetServer = ProtobufUtil.toServerName(data.getWorker());
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
state = data.getState();
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand All @@ -92,21 +103,21 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste
}

@Override
protected void complete(MasterProcedureEnv env, Throwable error) {
protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
succ = true;
return true;
} else {
if (error instanceof DoNotRetryIOException) {
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
succ = true;
return true;
} else {
LOG.warn("Failed split of {}, retry...", walPath, error);
succ = false;
return false;
}
}
}
Expand Down
Loading