diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
index fca1823126c2..f1139e4d7679 100644
--- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
@@ -29,6 +29,7 @@ import "RPC.proto";
import "Snapshot.proto";
import "Replication.proto";
import "RegionServerStatus.proto";
+import "ErrorHandling.proto";
// ============================================================================
// WARNING - Compatibility rules
@@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
optional ServerName target_server = 3;
+ optional ServerRemoteProcedureState state = 4;
+ optional ForeignExceptionMessage error = 5;
}
message SnapshotVerifyParameter {
@@ -503,6 +506,8 @@ message RefreshPeerStateData {
required string peer_id = 1;
required PeerModificationType type = 2;
required ServerName target_server = 3;
+ optional ServerRemoteProcedureState state = 5;
+ optional ForeignExceptionMessage error = 6;
}
message RefreshPeerParameter {
@@ -586,6 +591,14 @@ enum RegionRemoteProcedureBaseState {
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
}
+enum ServerRemoteProcedureState {
+ SERVER_REMOTE_PROCEDURE_DISPATCH = 1;
+ SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2;
+ SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3;
+ SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4;
+ SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5;
+}
+
message RegionRemoteProcedureBaseStateData {
required RegionInfo region = 1;
required ServerName target_server = 2;
@@ -617,6 +630,8 @@ message SwitchRpcThrottleStateData {
message SwitchRpcThrottleRemoteStateData {
required ServerName target_server = 1;
required bool rpc_throttle_enabled = 2;
+ optional ServerRemoteProcedureState state = 3;
+ optional ForeignExceptionMessage error = 4;
}
message SplitWALParameter {
@@ -634,6 +649,8 @@ message SplitWALRemoteData {
required string wal_path = 1;
required ServerName crashed_server = 2;
required ServerName worker = 3;
+ optional ServerRemoteProcedureState state = 4;
+ optional ForeignExceptionMessage error = 5;
}
enum SplitWALState {
@@ -650,6 +667,8 @@ message ClaimReplicationQueueRemoteStateData {
required ServerName crashed_server = 1;
required string queue = 2;
required ServerName target_server = 3;
+ optional ServerRemoteProcedureState state = 5;
+ optional ForeignExceptionMessage error = 6;
}
message ClaimReplicationQueueRemoteParameter {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
index d24471b938e2..0c89b6396417 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java
@@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+
@InterfaceAudience.Private
/**
* The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote)
@@ -63,34 +65,38 @@
*
* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle
* this which calls remoteOperationDone with the exception. If the targetServer crashed but this
- * procedure has no response, than dispatcher will call remoteOperationFailed() which also calls
- * remoteOperationDone with the exception. If the operation is successful, then
- * remoteOperationCompleted will be called and actually calls the remoteOperationDone without
- * exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others.
- * Then developer could implement complete() based on their own purpose. But basic logic is that if
- * operation succeed, set succ to true and do the clean work. If operation failed and require to
- * resend it to the same server, leave the succ as false. If operation failed and require to resend
- * it to another server, set succ to true and upper layer should be able to find out this operation
- * not work and send a operation to another server.
+ * procedure has no response or if we receive failed response, then dispatcher will call
+ * remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation
+ * is successful, then remoteOperationCompleted will be called and actually calls the
+ * remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is
+ * already get wake up by others. Then developer could implement complete() based on their own
+ * purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If
+ * operation failed and require to resend it to the same server, leave the succ as false. If
+ * operation failed and require to resend it to another server, set succ to true and upper layer
+ * should be able to find out this operation not work and send a operation to another server.
*/
public abstract class ServerRemoteProcedure extends Procedure
implements RemoteProcedureDispatcher.RemoteProcedure {
protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class);
protected ProcedureEvent> event;
protected ServerName targetServer;
- protected boolean dispatched;
- protected boolean succ;
+ // after remoteProcedureDone we require error field to decide the next state
+ protected Throwable remoteError;
+ protected MasterProcedureProtos.ServerRemoteProcedureState state =
+ MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
- protected abstract void complete(MasterProcedureEnv env, Throwable error);
+ protected abstract boolean complete(MasterProcedureEnv env, Throwable error);
@Override
protected synchronized Procedure[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
- if (dispatched) {
- if (succ) {
+ if (
+ state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
+ ) {
+ if (complete(env, this.remoteError)) {
return null;
}
- dispatched = false;
+ state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
}
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
@@ -99,7 +105,6 @@ protected synchronized Procedure[] execute(MasterProcedureEn
+ "be retried to send to another server", this.getProcId(), targetServer);
return null;
}
- dispatched = true;
event = new ProcedureEvent<>(this);
event.suspendIfNotReady(this);
throw new ProcedureSuspendedException();
@@ -113,17 +118,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) {
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
IOException exception) {
+ state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL;
remoteOperationDone(env, exception);
}
@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
+ state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
remoteOperationDone(env, null);
}
@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
+ state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED;
remoteOperationDone(env, error);
}
@@ -137,7 +145,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
getProcId());
return;
}
- complete(env, error);
+ this.remoteError = error;
+ // below persistence is added so that if report goes to last active master, it throws exception
+ env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
event.wake(env.getProcedureScheduler());
event = null;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
index 651822ff5b2a..5d3b25f7b14f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
@@ -32,12 +32,15 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
@@ -75,7 +78,8 @@ protected boolean abort(MasterProcedureEnv env) {
}
@Override
- protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
+ protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) {
+ boolean isProcedureCompleted = false;
try {
if (error != null) {
if (error instanceof RemoteProcedureException) {
@@ -83,23 +87,19 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error);
if (remoteEx instanceof CorruptedSnapshotException) {
// snapshot is corrupted, will touch a flag file and finish the procedure
- succ = true;
+ isProcedureCompleted = true;
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null) {
parent.markSnapshotCorrupted();
}
- } else {
- // unexpected exception in remote server, will retry on other servers
- succ = false;
- }
- } else {
- // the mostly like thing is that remote call failed, will retry on other servers
- succ = false;
- }
+ } // else unexpected exception in remote server, will retry on other servers,
+ // procedureCompleted will stay false
+ } // else the mostly like thing is that remote call failed, will retry on other servers,
+ // procedureCompleted will stay false
} else {
// remote operation finished without error
- succ = true;
+ isProcedureCompleted = true;
}
} catch (IOException e) {
// if we can't create the flag file, then mark the current procedure as FAILED
@@ -112,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
}
+ return isProcedureCompleted;
}
// we will wrap remote exception into a RemoteProcedureException,
@@ -126,7 +127,9 @@ protected synchronized Procedure[] execute(MasterProcedureEn
try {
// if we've already known the snapshot is corrupted, then stop scheduling
// the new procedures and the undispatched procedures
- if (!dispatched) {
+ if (
+ state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
+ ) {
SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor()
.getProcedure(SnapshotProcedure.class, getParentProcId());
if (parent != null && parent.isSnapshotCorrupted()) {
@@ -134,14 +137,19 @@ protected synchronized Procedure[] execute(MasterProcedureEn
}
}
// acquire a worker
- if (!dispatched && targetServer == null) {
+ if (
+ state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
+ && targetServer == null
+ ) {
targetServer =
env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this);
}
// send remote request
Procedure[] res = super.execute(env);
// retry if necessary
- if (!dispatched) {
+ if (
+ state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
+ ) {
// the mostly like thing is that a FailedRemoteDispatchException is thrown.
// we need to retry on another remote server
targetServer = null;
@@ -177,10 +185,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SnapshotVerifyProcedureStateData.Builder builder =
SnapshotVerifyProcedureStateData.newBuilder();
- builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
+ builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state);
if (targetServer != null) {
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
}
+ if (this.remoteError != null) {
+ ErrorHandlingProtos.ForeignExceptionMessage fem =
+ ForeignExceptionUtil.toProtoForeignException(remoteError);
+ builder.setError(fem);
+ }
serializer.serialize(builder.build());
}
@@ -190,9 +203,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SnapshotVerifyProcedureStateData.class);
this.snapshot = data.getSnapshot();
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
+ this.state = data.getState();
if (data.hasTargetServer()) {
this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
+ if (data.hasError()) {
+ this.remoteError = ForeignExceptionUtil.toException(data.getError());
+ }
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
index 3dc49073c720..1e6bb78e250c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java
@@ -25,12 +25,14 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
/**
@@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
- .setCrashedServer(ProtobufUtil.toServerName(crashedServer));
+ .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state);
+ if (this.remoteError != null) {
+ ErrorHandlingProtos.ForeignExceptionMessage fem =
+ ForeignExceptionUtil.toProtoForeignException(remoteError);
+ builder.setError(fem);
+ }
serializer.serialize(builder.build());
}
@@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
walPath = data.getWalPath();
targetServer = ProtobufUtil.toServerName(data.getWorker());
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
+ state = data.getState();
+ if (data.hasError()) {
+ this.remoteError = ForeignExceptionUtil.toException(data.getError());
+ }
}
@Override
@@ -92,21 +103,21 @@ public Optional remoteCallBuild(Maste
}
@Override
- protected void complete(MasterProcedureEnv env, Throwable error) {
+ protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error == null) {
try {
env.getMasterServices().getSplitWALManager().archive(walPath);
} catch (IOException e) {
LOG.warn("Failed split of {}; ignore...", walPath, e);
}
- succ = true;
+ return true;
} else {
if (error instanceof DoNotRetryIOException) {
LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error);
- succ = true;
+ return true;
} else {
LOG.warn("Failed split of {}, retry...", walPath, error);
- succ = false;
+ return false;
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
index 53a4af1b5104..668897cd9a4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java
@@ -23,11 +23,13 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;
/**
@@ -59,9 +61,16 @@ protected boolean abort(MasterProcedureEnv env) {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- SwitchRpcThrottleRemoteStateData.newBuilder()
- .setTargetServer(ProtobufUtil.toServerName(targetServer))
- .setRpcThrottleEnabled(rpcThrottleEnabled).build();
+ SwitchRpcThrottleRemoteStateData.Builder builder =
+ SwitchRpcThrottleRemoteStateData.newBuilder();
+ builder.setTargetServer(ProtobufUtil.toServerName(targetServer))
+ .setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build();
+ if (this.remoteError != null) {
+ ErrorHandlingProtos.ForeignExceptionMessage fem =
+ ForeignExceptionUtil.toProtoForeignException(remoteError);
+ builder.setError(fem);
+ }
+ serializer.serialize(builder.build());
}
@Override
@@ -70,6 +79,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SwitchRpcThrottleRemoteStateData.class);
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
rpcThrottleEnabled = data.getRpcThrottleEnabled();
+ state = data.getState();
+ if (data.hasError()) {
+ this.remoteError = ForeignExceptionUtil.toException(data.getError());
+ }
}
@Override
@@ -99,13 +112,13 @@ public ServerOperationType getServerOperationType() {
}
@Override
- protected void complete(MasterProcedureEnv env, Throwable error) {
+ protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer,
error);
- this.succ = false;
+ return false;
} else {
- this.succ = true;
+ return true;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
index 9ef97d1fff62..d7719a73e657 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java
@@ -28,11 +28,13 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
@@ -84,13 +86,13 @@ public ServerOperationType getServerOperationType() {
}
@Override
- protected void complete(MasterProcedureEnv env, Throwable error) {
+ protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Failed to claim replication queue {} of crashed server on server {} ", queue,
crashedServer, targetServer, error);
- this.succ = false;
+ return false;
} else {
- this.succ = true;
+ return true;
}
}
@@ -111,9 +113,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder()
- .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue)
- .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData
+ .newBuilder().setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue)
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state);
+ if (this.remoteError != null) {
+ ErrorHandlingProtos.ForeignExceptionMessage fem =
+ ForeignExceptionUtil.toProtoForeignException(remoteError);
+ builder.setError(fem);
+ }
+ serializer.serialize(builder.build());
}
@Override
@@ -123,5 +131,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
queue = data.getQueue();
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ state = data.getState();
+ if (data.hasError()) {
+ this.remoteError = ForeignExceptionUtil.toException(data.getError());
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
index ec47b2778857..699b1d805e16 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java
@@ -28,11 +28,13 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
+import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
@@ -108,13 +110,13 @@ public Optional remoteCallBuild(MasterProcedureEnv env, ServerN
}
@Override
- protected void complete(MasterProcedureEnv env, Throwable error) {
+ protected boolean complete(MasterProcedureEnv env, Throwable error) {
if (error != null) {
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
- this.succ = false;
+ return false;
} else {
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
- this.succ = true;
+ return true;
}
}
@@ -136,9 +138,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
- serializer.serialize(
- RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
- .setTargetServer(ProtobufUtil.toServerName(targetServer)).build());
+ RefreshPeerStateData.Builder builder =
+ RefreshPeerStateData.newBuilder();
+ if (this.remoteError != null) {
+ ErrorHandlingProtos.ForeignExceptionMessage fem =
+ ForeignExceptionUtil.toProtoForeignException(remoteError);
+ builder.setError(fem);
+ }
+ serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type))
+ .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state).build());
}
@Override
@@ -147,5 +155,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
peerId = data.getPeerId();
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
+ state = data.getState();
+ if (data.hasError()) {
+ this.remoteError = ForeignExceptionUtil.toException(data.getError());
+ }
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
index c0cee15bee38..8fe88efc39cd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java
@@ -199,9 +199,8 @@ public synchronized void remoteOperationFailed(MasterProcedureEnv env,
}
@Override
- public void complete(MasterProcedureEnv env, Throwable error) {
- this.succ = true;
- return;
+ public boolean complete(MasterProcedureEnv env, Throwable error) {
+ return true;
}
@Override