diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 48d20b6bef27..9161a02c1800 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -29,6 +29,7 @@ import "rpc/RPC.proto"; import "server/Snapshot.proto"; import "server/master/Replication.proto"; import "server/master/RegionServerStatus.proto"; +import "server/ErrorHandling.proto"; // ============================================================================ // WARNING - Compatibility rules @@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData { required SnapshotDescription snapshot = 1; required RegionInfo region = 2; optional ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message SnapshotVerifyParameter { @@ -522,6 +525,8 @@ message RefreshPeerStateData { required ServerName target_server = 3; /** We need multiple stages for sync replication state transition **/ optional uint32 stage = 4 [default = 0]; + optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message RefreshPeerParameter { @@ -613,6 +618,8 @@ message SyncReplicationReplayWALRemoteStateData { required string peer_id = 1; repeated string wal = 2; required ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message ReplaySyncReplicationWALParameter { @@ -650,6 +657,14 @@ enum RegionRemoteProcedureBaseState { REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4; } +enum ServerRemoteProcedureState { + SERVER_REMOTE_PROCEDURE_DISPATCH = 1; + SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2; + SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3; + SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4; + SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5; +} + message RegionRemoteProcedureBaseStateData { required RegionInfo region = 1; required ServerName target_server = 2; @@ -681,6 +696,8 @@ message SwitchRpcThrottleStateData { message SwitchRpcThrottleRemoteStateData { required ServerName target_server = 1; required bool rpc_throttle_enabled = 2; + optional ServerRemoteProcedureState state = 3; + optional ForeignExceptionMessage error = 4; } message SplitWALParameter { @@ -698,6 +715,8 @@ message SplitWALRemoteData{ required string wal_path = 1; required ServerName crashed_server=2; required ServerName worker = 3; + optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } enum SplitWALState{ @@ -715,6 +734,8 @@ message ClaimReplicationQueueRemoteStateData { required string queue = 2; required ServerName target_server = 3; optional ServerName source_server = 4; + optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message ClaimReplicationQueueRemoteParameter { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index d24471b938e2..0c89b6396417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + @InterfaceAudience.Private /** * The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote) @@ -63,34 +65,38 @@ * <p> * If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle * this which calls remoteOperationDone with the exception. If the targetServer crashed but this - * procedure has no response, than dispatcher will call remoteOperationFailed() which also calls - * remoteOperationDone with the exception. If the operation is successful, then - * remoteOperationCompleted will be called and actually calls the remoteOperationDone without - * exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others. - * Then developer could implement complete() based on their own purpose. But basic logic is that if - * operation succeed, set succ to true and do the clean work. If operation failed and require to - * resend it to the same server, leave the succ as false. If operation failed and require to resend - * it to another server, set succ to true and upper layer should be able to find out this operation - * not work and send a operation to another server. + * procedure has no response or if we receive failed response, then dispatcher will call + * remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation + * is successful, then remoteOperationCompleted will be called and actually calls the + * remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is + * already get wake up by others. Then developer could implement complete() based on their own + * purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If + * operation failed and require to resend it to the same server, leave the succ as false. If + * operation failed and require to resend it to another server, set succ to true and upper layer + * should be able to find out this operation not work and send a operation to another server. */ public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv> implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName> { protected static final Logger LOG = LoggerFactory.getLogger(ServerRemoteProcedure.class); protected ProcedureEvent<?> event; protected ServerName targetServer; - protected boolean dispatched; - protected boolean succ; + // after remoteProcedureDone we require error field to decide the next state + protected Throwable remoteError; + protected MasterProcedureProtos.ServerRemoteProcedureState state = + MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; - protected abstract void complete(MasterProcedureEnv env, Throwable error); + protected abstract boolean complete(MasterProcedureEnv env, Throwable error); @Override protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { - if (succ) { + if ( + state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { + if (complete(env, this.remoteError)) { return null; } - dispatched = false; + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; } try { env.getRemoteDispatcher().addOperationToNode(targetServer, this); @@ -99,7 +105,6 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn + "be retried to send to another server", this.getProcId(), targetServer); return null; } - dispatched = true; event = new ProcedureEvent<>(this); event.suspendIfNotReady(this); throw new ProcedureSuspendedException(); @@ -113,17 +118,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) { @Override public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException exception) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL; remoteOperationDone(env, exception); } @Override public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; remoteOperationDone(env, null); } @Override public synchronized void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED; remoteOperationDone(env, error); } @@ -137,7 +145,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { getProcId()); return; } - complete(env, error); + this.remoteError = error; + // below persistence is added so that if report goes to last active master, it throws exception + env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); event.wake(env.getProcedureScheduler()); event = null; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index 651822ff5b2a..5d3b25f7b14f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -32,12 +32,15 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable; import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @@ -75,7 +78,8 @@ protected boolean abort(MasterProcedureEnv env) { } @Override - protected synchronized void complete(MasterProcedureEnv env, Throwable error) { + protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) { + boolean isProcedureCompleted = false; try { if (error != null) { if (error instanceof RemoteProcedureException) { @@ -83,23 +87,19 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error); if (remoteEx instanceof CorruptedSnapshotException) { // snapshot is corrupted, will touch a flag file and finish the procedure - succ = true; + isProcedureCompleted = true; SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() .getProcedure(SnapshotProcedure.class, getParentProcId()); if (parent != null) { parent.markSnapshotCorrupted(); } - } else { - // unexpected exception in remote server, will retry on other servers - succ = false; - } - } else { - // the mostly like thing is that remote call failed, will retry on other servers - succ = false; - } + } // else unexpected exception in remote server, will retry on other servers, + // procedureCompleted will stay false + } // else the mostly like thing is that remote call failed, will retry on other servers, + // procedureCompleted will stay false } else { // remote operation finished without error - succ = true; + isProcedureCompleted = true; } } catch (IOException e) { // if we can't create the flag file, then mark the current procedure as FAILED @@ -112,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer, env.getProcedureScheduler()); } + return isProcedureCompleted; } // we will wrap remote exception into a RemoteProcedureException, @@ -126,7 +127,9 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn try { // if we've already known the snapshot is corrupted, then stop scheduling // the new procedures and the undispatched procedures - if (!dispatched) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() .getProcedure(SnapshotProcedure.class, getParentProcId()); if (parent != null && parent.isSnapshotCorrupted()) { @@ -134,14 +137,19 @@ protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEn } } // acquire a worker - if (!dispatched && targetServer == null) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + && targetServer == null + ) { targetServer = env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this); } // send remote request Procedure<MasterProcedureEnv>[] res = super.execute(env); // retry if necessary - if (!dispatched) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { // the mostly like thing is that a FailedRemoteDispatchException is thrown. // we need to retry on another remote server targetServer = null; @@ -177,10 +185,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SnapshotVerifyProcedureStateData.Builder builder = SnapshotVerifyProcedureStateData.newBuilder(); - builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)); + builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state); if (targetServer != null) { builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); } + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -190,9 +203,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws serializer.deserialize(SnapshotVerifyProcedureStateData.class); this.snapshot = data.getSnapshot(); this.region = ProtobufUtil.toRegionInfo(data.getRegion()); + this.state = data.getState(); if (data.hasTargetServer()) { this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index 3dc49073c720..1e6bb78e250c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -25,12 +25,14 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; /** @@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO MasterProcedureProtos.SplitWALRemoteData.Builder builder = MasterProcedureProtos.SplitWALRemoteData.newBuilder(); builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) - .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws walPath = data.getWalPath(); targetServer = ProtobufUtil.toServerName(data.getWorker()); crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override @@ -92,21 +103,21 @@ public Optional<RemoteProcedureDispatcher.RemoteOperation> remoteCallBuild(Maste } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error == null) { try { env.getMasterServices().getSplitWALManager().archive(walPath); } catch (IOException e) { LOG.warn("Failed split of {}; ignore...", walPath, e); } - succ = true; + return true; } else { if (error instanceof DoNotRetryIOException) { LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error); - succ = true; + return true; } else { LOG.warn("Failed split of {}, retry...", walPath, error); - succ = false; + return false; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 53a4af1b5104..668897cd9a4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData; /** @@ -59,9 +61,16 @@ protected boolean abort(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - SwitchRpcThrottleRemoteStateData.newBuilder() - .setTargetServer(ProtobufUtil.toServerName(targetServer)) - .setRpcThrottleEnabled(rpcThrottleEnabled).build(); + SwitchRpcThrottleRemoteStateData.Builder builder = + SwitchRpcThrottleRemoteStateData.newBuilder(); + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.build()); } @Override @@ -70,6 +79,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws serializer.deserialize(SwitchRpcThrottleRemoteStateData.class); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); rpcThrottleEnabled = data.getRpcThrottleEnabled(); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override @@ -99,13 +112,13 @@ public ServerOperationType getServerOperationType() { } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer, error); - this.succ = false; + return false; } else { - this.succ = true; + return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java index d3aeeba541a2..e6cf46216759 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -35,11 +35,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; @@ -116,12 +118,12 @@ public ServerOperationType getServerOperationType() { } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error); - this.succ = false; + return false; } else { - this.succ = true; + return true; } } @@ -144,7 +146,13 @@ protected boolean waitInitialized(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) - .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)); + .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } queueId.getSourceServerName() .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); serializer.serialize(builder.build()); @@ -157,11 +165,15 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getTargetServer()); ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); String queue = data.getQueue(); + state = data.getState(); if (data.hasSourceServer()) { queueId = new ReplicationQueueId(crashedServer, queue, ProtobufUtil.toServerName(data.getSourceServer())); } else { queueId = new ReplicationQueueId(crashedServer, queue); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 0e1b9a3b3810..ef997fba4172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; @@ -121,13 +123,13 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); - this.succ = false; + return false; } else { LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); - this.succ = true; + return true; } } @@ -149,9 +151,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - serializer.serialize( - RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build()); + RefreshPeerStateData.Builder builder = RefreshPeerStateData.newBuilder(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state) + .build()); } @Override @@ -161,5 +169,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws type = toPeerOperationType(data.getType()); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); stage = data.getStage(); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 4d3bf236716f..40a00fdd4daf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -29,11 +29,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; @@ -71,14 +73,14 @@ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerN ReplaySyncReplicationWALCallable.class, builder.build().toByteArray())); } - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error); - this.succ = false; + return false; } else { truncateWALs(env); LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId); - this.succ = true; + return true; } } @@ -125,8 +127,13 @@ protected boolean abort(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SyncReplicationReplayWALRemoteStateData.Builder builder = SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) - .setTargetServer(ProtobufUtil.toServerName(targetServer)); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); wals.stream().forEach(builder::addWal); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -138,6 +145,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws wals = new ArrayList<>(); data.getWalList().forEach(wals::add); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java index b7c2d5099879..d3fca2f59895 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -199,9 +199,8 @@ public synchronized void remoteOperationFailed(MasterProcedureEnv env, } @Override - public void complete(MasterProcedureEnv env, Throwable error) { - this.succ = true; - return; + public boolean complete(MasterProcedureEnv env, Throwable error) { + return true; } @Override