From 33bfcf87a78953235efdbec71868469d5d76f877 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Wed, 10 Apr 2024 04:47:32 +0530 Subject: [PATCH 1/8] HBASE-28420 update the procedure's field to store for ServerRemoteProcedure --- .../protobuf/server/master/MasterProcedure.proto | 12 ++++++++++++ .../master/procedure/ServerRemoteProcedure.java | 7 +++++++ .../master/procedure/SnapshotVerifyProcedure.java | 4 +++- .../master/procedure/SplitWALRemoteProcedure.java | 4 +++- .../procedure/SwitchRpcThrottleRemoteProcedure.java | 4 +++- .../ClaimReplicationQueueRemoteProcedure.java | 4 +++- .../master/replication/RefreshPeerProcedure.java | 4 +++- .../SyncReplicationReplayWALRemoteProcedure.java | 4 +++- 8 files changed, 37 insertions(+), 6 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 48d20b6bef27..850360bbec51 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -254,6 +254,7 @@ message SnapshotVerifyProcedureStateData { required SnapshotDescription snapshot = 1; required RegionInfo region = 2; optional ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; } message SnapshotVerifyParameter { @@ -522,6 +523,7 @@ message RefreshPeerStateData { required ServerName target_server = 3; /** We need multiple stages for sync replication state transition **/ optional uint32 stage = 4 [default = 0]; + optional ServerRemoteProcedureState state = 5; } message RefreshPeerParameter { @@ -613,6 +615,7 @@ message SyncReplicationReplayWALRemoteStateData { required string peer_id = 1; repeated string wal = 2; required ServerName target_server = 3; + optional ServerRemoteProcedureState state = 4; } message ReplaySyncReplicationWALParameter { @@ -650,6 +653,12 @@ enum RegionRemoteProcedureBaseState { REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4; } +enum ServerRemoteProcedureState { + SERVER_REMOTE_PROCEDURE_DISPATCH = 1; + SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 2; + SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 3; +} + message RegionRemoteProcedureBaseStateData { required RegionInfo region = 1; required ServerName target_server = 2; @@ -681,6 +690,7 @@ message SwitchRpcThrottleStateData { message SwitchRpcThrottleRemoteStateData { required ServerName target_server = 1; required bool rpc_throttle_enabled = 2; + optional ServerRemoteProcedureState state = 3; } message SplitWALParameter { @@ -698,6 +708,7 @@ message SplitWALRemoteData{ required string wal_path = 1; required ServerName crashed_server=2; required ServerName worker = 3; + optional ServerRemoteProcedureState state = 4; } enum SplitWALState{ @@ -715,6 +726,7 @@ message ClaimReplicationQueueRemoteStateData { required string queue = 2; required ServerName target_server = 3; optional ServerName source_server = 4; + optional ServerRemoteProcedureState state = 5; } message ClaimReplicationQueueRemoteParameter { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index d24471b938e2..f3e276f377f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @InterfaceAudience.Private /** @@ -80,6 +81,8 @@ public abstract class ServerRemoteProcedure extends Procedure builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); serializer.serialize(builder.build()); @@ -157,6 +158,7 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getTargetServer()); ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); String queue = data.getQueue(); + state = data.getState(); if (data.hasSourceServer()) { queueId = new ReplicationQueueId(crashedServer, queue, ProtobufUtil.toServerName(data.getSourceServer())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 0e1b9a3b3810..b67869cb318d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -151,7 +151,8 @@ protected boolean waitInitialized(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { serializer.serialize( RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build()); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage) + .setState(state).build()); } @Override @@ -161,5 +162,6 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws type = toPeerOperationType(data.getType()); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); stage = data.getStage(); + state = data.getState(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 4d3bf236716f..fbe19c7fdf28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -125,7 +125,8 @@ protected boolean abort(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SyncReplicationReplayWALRemoteStateData.Builder builder = SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) - .setTargetServer(ProtobufUtil.toServerName(targetServer)); + .setTargetServer(ProtobufUtil.toServerName(targetServer)) + .setState(state); wals.stream().forEach(builder::addWal); serializer.serialize(builder.build()); } @@ -138,6 +139,7 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws wals = new ArrayList<>(); data.getWalList().forEach(wals::add); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + state = data.getState(); } @Override From 95ab28af41b13a9668af2b179d37f1da0e00018f Mon Sep 17 00:00:00 2001 From: ukumawat Date: Fri, 12 Apr 2024 01:27:12 +0530 Subject: [PATCH 2/8] HBASE-28420 review comment - right state for different flows --- .../src/main/protobuf/server/master/MasterProcedure.proto | 1 + .../hbase/master/procedure/ServerRemoteProcedure.java | 7 +++++-- .../hbase/master/procedure/SnapshotVerifyProcedure.java | 3 +-- .../hbase/master/procedure/SplitWALRemoteProcedure.java | 3 +-- .../master/procedure/SwitchRpcThrottleRemoteProcedure.java | 3 +-- .../hbase/master/replication/RefreshPeerProcedure.java | 4 ++-- .../SyncReplicationReplayWALRemoteProcedure.java | 3 +-- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 850360bbec51..3179aef54fa9 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -657,6 +657,7 @@ enum ServerRemoteProcedureState { SERVER_REMOTE_PROCEDURE_DISPATCH = 1; SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 2; SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 3; + SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 4; } message RegionRemoteProcedureBaseStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index f3e276f377f6..b4e241e19227 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; @InterfaceAudience.Private @@ -116,17 +117,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) { @Override public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, IOException exception) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL; remoteOperationDone(env, exception); } @Override public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; remoteOperationDone(env, null); } @Override public synchronized void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_SERVER_CRASH; remoteOperationDone(env, error); } @@ -140,8 +144,7 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { getProcId()); return; } - //below persistence is added so that if report goes to last active master, it throws exception - state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; + // below persistence is added so that if report goes to last active master, it throws exception env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); complete(env, error); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index 64bfac2e4cb3..a25f0db307df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -177,8 +177,7 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SnapshotVerifyProcedureStateData.Builder builder = SnapshotVerifyProcedureStateData.newBuilder(); - builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)) - .setState(state); + builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state); if (targetServer != null) { builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index 25d7ffe796d3..de7ebf04d724 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -70,8 +70,7 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO MasterProcedureProtos.SplitWALRemoteData.Builder builder = MasterProcedureProtos.SplitWALRemoteData.newBuilder(); builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) - .setCrashedServer(ProtobufUtil.toServerName(crashedServer)) - .setState(state); + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state); serializer.serialize(builder.build()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index dd82cabbd4dd..4e71cfe16511 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -61,8 +61,7 @@ protected boolean abort(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SwitchRpcThrottleRemoteStateData.newBuilder() .setTargetServer(ProtobufUtil.toServerName(targetServer)) - .setRpcThrottleEnabled(rpcThrottleEnabled) - .setState(state).build(); + .setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index b67869cb318d..61294831b737 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -151,8 +151,8 @@ protected boolean waitInitialized(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { serializer.serialize( RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage) - .setState(state).build()); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state) + .build()); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index fbe19c7fdf28..d70115eebbf5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -125,8 +125,7 @@ protected boolean abort(MasterProcedureEnv env) { protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { SyncReplicationReplayWALRemoteStateData.Builder builder = SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) - .setTargetServer(ProtobufUtil.toServerName(targetServer)) - .setState(state); + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); wals.stream().forEach(builder::addWal); serializer.serialize(builder.build()); } From d56d160e100fd861b68434cade7a01db282a90da Mon Sep 17 00:00:00 2001 From: ukumawat Date: Wed, 24 Apr 2024 23:14:07 +0530 Subject: [PATCH 3/8] HBASE-28420 changed state for remoteOperationFailed and order of persistence and complete change --- .../src/main/protobuf/server/master/MasterProcedure.proto | 7 ++++--- .../hbase/master/procedure/ServerRemoteProcedure.java | 5 ++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index 3179aef54fa9..ba51dfb4345f 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -655,9 +655,10 @@ enum RegionRemoteProcedureBaseState { enum ServerRemoteProcedureState { SERVER_REMOTE_PROCEDURE_DISPATCH = 1; - SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 2; - SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 3; - SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 4; + SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2; + SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3; + SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4; + SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5; } message RegionRemoteProcedureBaseStateData { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index b4e241e19227..d946dec253dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -130,7 +130,7 @@ public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { @Override public synchronized void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { - state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_SERVER_CRASH; + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED; remoteOperationDone(env, error); } @@ -144,10 +144,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { getProcId()); return; } + complete(env, error); // below persistence is added so that if report goes to last active master, it throws exception env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); - - complete(env, error); event.wake(env.getProcedureScheduler()); event = null; } From cb8dc37af7026ed2f2617a4b7360f68fd684f8e6 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Tue, 23 Apr 2024 19:57:03 +0530 Subject: [PATCH 4/8] HBASE-28420 moved complete method to execute --- .../server/master/MasterProcedure.proto | 7 +++++++ .../procedure/ServerRemoteProcedure.java | 12 +++++++++--- .../procedure/SnapshotVerifyProcedure.java | 10 ++++++++++ .../procedure/SplitWALRemoteProcedure.java | 10 ++++++++++ .../SwitchRpcThrottleRemoteProcedure.java | 16 ++++++++++++++-- .../ClaimReplicationQueueRemoteProcedure.java | 10 ++++++++++ .../replication/RefreshPeerProcedure.java | 18 ++++++++++++++---- ...yncReplicationReplayWALRemoteProcedure.java | 10 ++++++++++ 8 files changed, 84 insertions(+), 9 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index ba51dfb4345f..9161a02c1800 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -29,6 +29,7 @@ import "rpc/RPC.proto"; import "server/Snapshot.proto"; import "server/master/Replication.proto"; import "server/master/RegionServerStatus.proto"; +import "server/ErrorHandling.proto"; // ============================================================================ // WARNING - Compatibility rules @@ -255,6 +256,7 @@ message SnapshotVerifyProcedureStateData { required RegionInfo region = 2; optional ServerName target_server = 3; optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message SnapshotVerifyParameter { @@ -524,6 +526,7 @@ message RefreshPeerStateData { /** We need multiple stages for sync replication state transition **/ optional uint32 stage = 4 [default = 0]; optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message RefreshPeerParameter { @@ -616,6 +619,7 @@ message SyncReplicationReplayWALRemoteStateData { repeated string wal = 2; required ServerName target_server = 3; optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } message ReplaySyncReplicationWALParameter { @@ -693,6 +697,7 @@ message SwitchRpcThrottleRemoteStateData { required ServerName target_server = 1; required bool rpc_throttle_enabled = 2; optional ServerRemoteProcedureState state = 3; + optional ForeignExceptionMessage error = 4; } message SplitWALParameter { @@ -711,6 +716,7 @@ message SplitWALRemoteData{ required ServerName crashed_server=2; required ServerName worker = 3; optional ServerRemoteProcedureState state = 4; + optional ForeignExceptionMessage error = 5; } enum SplitWALState{ @@ -729,6 +735,7 @@ message ClaimReplicationQueueRemoteStateData { required ServerName target_server = 3; optional ServerName source_server = 4; optional ServerRemoteProcedureState state = 5; + optional ForeignExceptionMessage error = 6; } message ClaimReplicationQueueRemoteParameter { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index d946dec253dc..776eae27e429 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -82,6 +82,8 @@ public abstract class ServerRemoteProcedure extends Procedure[] execute(MasterProcedureEnv env) throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - if (dispatched) { + if ( + state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { + complete(env, this.remoteError); if (succ) { return null; } - dispatched = false; + state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; + ; } try { env.getRemoteDispatcher().addOperationToNode(targetServer, this); @@ -144,7 +150,7 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) { getProcId()); return; } - complete(env, error); + this.remoteError = error; // below persistence is added so that if report goes to last active master, it throws exception env.getMasterServices().getMasterProcedureExecutor().getStore().update(this); event.wake(env.getProcedureScheduler()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index a25f0db307df..d6ab01d20452 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -32,12 +32,14 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable; import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @@ -181,6 +183,11 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO if (targetServer != null) { builder.setTargetServer(ProtobufUtil.toServerName(targetServer)); } + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -194,6 +201,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws if (data.hasTargetServer()) { this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index de7ebf04d724..d074bcfa077b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -25,12 +25,14 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; /** @@ -71,6 +73,11 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO MasterProcedureProtos.SplitWALRemoteData.newBuilder(); builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer)) .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -82,6 +89,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getWorker()); crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 4e71cfe16511..37270cf52139 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -23,11 +23,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData; /** @@ -59,9 +61,16 @@ protected boolean abort(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - SwitchRpcThrottleRemoteStateData.newBuilder() - .setTargetServer(ProtobufUtil.toServerName(targetServer)) + SwitchRpcThrottleRemoteStateData.Builder builder = + SwitchRpcThrottleRemoteStateData.newBuilder(); + builder.setTargetServer(ProtobufUtil.toServerName(targetServer)) .setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.build()); } @Override @@ -71,6 +80,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getTargetServer()); rpcThrottleEnabled = data.getRpcThrottleEnabled(); state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java index 47279ffbdeec..83618de58e6e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -35,11 +35,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueId; import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; @@ -146,6 +148,11 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)) .setState(state); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } queueId.getSourceServerName() .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); serializer.serialize(builder.build()); @@ -165,5 +172,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws } else { queueId = new ReplicationQueueId(crashedServer, queue); } + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 61294831b737..4196f537fe14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -28,11 +28,13 @@ import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; @@ -149,10 +151,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) { @Override protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { - serializer.serialize( - RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) - .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state) - .build()); + RefreshPeerStateData.Builder builder = RefreshPeerStateData.newBuilder(); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } + serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state) + .build()); } @Override @@ -163,5 +170,8 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws targetServer = ProtobufUtil.toServerName(data.getTargetServer()); stage = data.getStage(); state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index d70115eebbf5..356c9b1e963a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -29,11 +29,13 @@ import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; import org.apache.hadoop.hbase.replication.regionserver.ReplaySyncReplicationWALCallable; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ReplaySyncReplicationWALParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SyncReplicationReplayWALRemoteStateData; @@ -127,6 +129,11 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO SyncReplicationReplayWALRemoteStateData.newBuilder().setPeerId(peerId) .setTargetServer(ProtobufUtil.toServerName(targetServer)).setState(state); wals.stream().forEach(builder::addWal); + if (this.remoteError != null) { + ErrorHandlingProtos.ForeignExceptionMessage fem = + ForeignExceptionUtil.toProtoForeignException(remoteError); + builder.setError(fem); + } serializer.serialize(builder.build()); } @@ -139,6 +146,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws data.getWalList().forEach(wals::add); targetServer = ProtobufUtil.toServerName(data.getTargetServer()); state = data.getState(); + if (data.hasError()) { + this.remoteError = ForeignExceptionUtil.toException(data.getError()); + } } @Override From 0e0df69bd07f8281362fb4e84a9dec0c29970545 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Wed, 1 May 2024 00:21:44 +0530 Subject: [PATCH 5/8] HBASE-28420 updated the documentation comments --- .../hadoop/hbase/master/procedure/ServerRemoteProcedure.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 776eae27e429..7fd2a7db0387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -65,7 +65,7 @@ *

* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle * this which calls remoteOperationDone with the exception. If the targetServer crashed but this - * procedure has no response, than dispatcher will call remoteOperationFailed() which also calls + * procedure has no response or if we receive failed response, then dispatcher will call remoteOperationFailed() which also calls * remoteOperationDone with the exception. If the operation is successful, then * remoteOperationCompleted will be called and actually calls the remoteOperationDone without * exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others. From ad84ebc0c1151e0c028388a588067dd8a5fbf14f Mon Sep 17 00:00:00 2001 From: ukumawat Date: Thu, 2 May 2024 12:26:52 +0530 Subject: [PATCH 6/8] HBASE-28420 spotless apply --- .../procedure/ServerRemoteProcedure.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 7fd2a7db0387..82b258d679d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -65,15 +65,15 @@ *

* If sending the operation to remote RS failed, dispatcher will call remoteCallFailed() to handle * this which calls remoteOperationDone with the exception. If the targetServer crashed but this - * procedure has no response or if we receive failed response, then dispatcher will call remoteOperationFailed() which also calls - * remoteOperationDone with the exception. If the operation is successful, then - * remoteOperationCompleted will be called and actually calls the remoteOperationDone without - * exception. In remoteOperationDone, we'll check if the procedure is already get wake up by others. - * Then developer could implement complete() based on their own purpose. But basic logic is that if - * operation succeed, set succ to true and do the clean work. If operation failed and require to - * resend it to the same server, leave the succ as false. If operation failed and require to resend - * it to another server, set succ to true and upper layer should be able to find out this operation - * not work and send a operation to another server. + * procedure has no response or if we receive failed response, then dispatcher will call + * remoteOperationFailed() which also calls remoteOperationDone with the exception. If the operation + * is successful, then remoteOperationCompleted will be called and actually calls the + * remoteOperationDone without exception. In remoteOperationDone, we'll check if the procedure is + * already get wake up by others. Then developer could implement complete() based on their own + * purpose. But basic logic is that if operation succeed, set succ to true and do the clean work. If + * operation failed and require to resend it to the same server, leave the succ as false. If + * operation failed and require to resend it to another server, set succ to true and upper layer + * should be able to find out this operation not work and send a operation to another server. */ public abstract class ServerRemoteProcedure extends Procedure implements RemoteProcedureDispatcher.RemoteProcedure { From f10dbd6b71f939c4f431e73690954bb56f7f1723 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Wed, 29 May 2024 14:06:58 +0530 Subject: [PATCH 7/8] HBASE-28420 review comments --- .../hadoop/hbase/master/procedure/ServerRemoteProcedure.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 82b258d679d5..68a69c7bf37d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -100,7 +100,6 @@ protected synchronized Procedure[] execute(MasterProcedureEn return null; } state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; - ; } try { env.getRemoteDispatcher().addOperationToNode(targetServer, this); From c78f5da22e749b46167d6af53d88ecaee77e9741 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Thu, 30 May 2024 19:18:01 +0530 Subject: [PATCH 8/8] HBASE-28420 remove succ update from child and add stats in SnapshotVerifyProcedure --- .../procedure/ServerRemoteProcedure.java | 8 ++--- .../procedure/SnapshotVerifyProcedure.java | 34 +++++++++++-------- .../procedure/SplitWALRemoteProcedure.java | 8 ++--- .../SwitchRpcThrottleRemoteProcedure.java | 6 ++-- .../ClaimReplicationQueueRemoteProcedure.java | 6 ++-- .../replication/RefreshPeerProcedure.java | 6 ++-- ...ncReplicationReplayWALRemoteProcedure.java | 6 ++-- .../procedure/TestServerRemoteProcedure.java | 5 ++- 8 files changed, 40 insertions(+), 39 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 68a69c7bf37d..0c89b6396417 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -80,14 +80,12 @@ public abstract class ServerRemoteProcedure extends Procedure event; protected ServerName targetServer; - protected boolean dispatched; - protected boolean succ; // after remoteProcedureDone we require error field to decide the next state protected Throwable remoteError; protected MasterProcedureProtos.ServerRemoteProcedureState state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; - protected abstract void complete(MasterProcedureEnv env, Throwable error); + protected abstract boolean complete(MasterProcedureEnv env, Throwable error); @Override protected synchronized Procedure[] execute(MasterProcedureEnv env) @@ -95,8 +93,7 @@ protected synchronized Procedure[] execute(MasterProcedureEn if ( state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH ) { - complete(env, this.remoteError); - if (succ) { + if (complete(env, this.remoteError)) { return null; } state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH; @@ -108,7 +105,6 @@ protected synchronized Procedure[] execute(MasterProcedureEn + "be retried to send to another server", this.getProcId(), targetServer); return null; } - dispatched = true; event = new ProcedureEvent<>(this); event.suspendIfNotReady(this); throw new ProcedureSuspendedException(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index d6ab01d20452..5d3b25f7b14f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @@ -77,7 +78,8 @@ protected boolean abort(MasterProcedureEnv env) { } @Override - protected synchronized void complete(MasterProcedureEnv env, Throwable error) { + protected synchronized boolean complete(MasterProcedureEnv env, Throwable error) { + boolean isProcedureCompleted = false; try { if (error != null) { if (error instanceof RemoteProcedureException) { @@ -85,23 +87,19 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { Throwable remoteEx = unwrapRemoteProcedureException((RemoteProcedureException) error); if (remoteEx instanceof CorruptedSnapshotException) { // snapshot is corrupted, will touch a flag file and finish the procedure - succ = true; + isProcedureCompleted = true; SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() .getProcedure(SnapshotProcedure.class, getParentProcId()); if (parent != null) { parent.markSnapshotCorrupted(); } - } else { - // unexpected exception in remote server, will retry on other servers - succ = false; - } - } else { - // the mostly like thing is that remote call failed, will retry on other servers - succ = false; - } + } // else unexpected exception in remote server, will retry on other servers, + // procedureCompleted will stay false + } // else the mostly like thing is that remote call failed, will retry on other servers, + // procedureCompleted will stay false } else { // remote operation finished without error - succ = true; + isProcedureCompleted = true; } } catch (IOException e) { // if we can't create the flag file, then mark the current procedure as FAILED @@ -114,6 +112,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer, env.getProcedureScheduler()); } + return isProcedureCompleted; } // we will wrap remote exception into a RemoteProcedureException, @@ -128,7 +127,9 @@ protected synchronized Procedure[] execute(MasterProcedureEn try { // if we've already known the snapshot is corrupted, then stop scheduling // the new procedures and the undispatched procedures - if (!dispatched) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { SnapshotProcedure parent = env.getMasterServices().getMasterProcedureExecutor() .getProcedure(SnapshotProcedure.class, getParentProcId()); if (parent != null && parent.isSnapshotCorrupted()) { @@ -136,14 +137,19 @@ protected synchronized Procedure[] execute(MasterProcedureEn } } // acquire a worker - if (!dispatched && targetServer == null) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + && targetServer == null + ) { targetServer = env.getMasterServices().getSnapshotManager().acquireSnapshotVerifyWorker(this); } // send remote request Procedure[] res = super.execute(env); // retry if necessary - if (!dispatched) { + if ( + state == MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH + ) { // the mostly like thing is that a FailedRemoteDispatchException is thrown. // we need to retry on another remote server targetServer = null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java index d074bcfa077b..1e6bb78e250c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -103,21 +103,21 @@ public Optional remoteCallBuild(Maste } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error == null) { try { env.getMasterServices().getSplitWALManager().archive(walPath); } catch (IOException e) { LOG.warn("Failed split of {}; ignore...", walPath, e); } - succ = true; + return true; } else { if (error instanceof DoNotRetryIOException) { LOG.warn("Sent {} to wrong server {}, try another", walPath, targetServer, error); - succ = true; + return true; } else { LOG.warn("Failed split of {}, retry...", walPath, error); - succ = false; + return false; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java index 37270cf52139..668897cd9a4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleRemoteProcedure.java @@ -112,13 +112,13 @@ public ServerOperationType getServerOperationType() { } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Failed to switch rpc throttle to {} on server {}", rpcThrottleEnabled, targetServer, error); - this.succ = false; + return false; } else { - this.succ = true; + return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java index 83618de58e6e..e6cf46216759 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ClaimReplicationQueueRemoteProcedure.java @@ -118,12 +118,12 @@ public ServerOperationType getServerOperationType() { } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error); - this.succ = false; + return false; } else { - this.succ = true; + return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java index 4196f537fe14..ef997fba4172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -123,13 +123,13 @@ public Optional remoteCallBuild(MasterProcedureEnv env, ServerN } @Override - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error); - this.succ = false; + return false; } else { LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer); - this.succ = true; + return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java index 356c9b1e963a..40a00fdd4daf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/SyncReplicationReplayWALRemoteProcedure.java @@ -73,14 +73,14 @@ public Optional remoteCallBuild(MasterProcedureEnv env, ServerN ReplaySyncReplicationWALCallable.class, builder.build().toByteArray())); } - protected void complete(MasterProcedureEnv env, Throwable error) { + protected boolean complete(MasterProcedureEnv env, Throwable error) { if (error != null) { LOG.warn("Replay wals {} on {} failed for peer id={}", wals, targetServer, peerId, error); - this.succ = false; + return false; } else { truncateWALs(env); LOG.info("Replay wals {} on {} succeed for peer id={}", wals, targetServer, peerId); - this.succ = true; + return true; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java index b7c2d5099879..d3fca2f59895 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -199,9 +199,8 @@ public synchronized void remoteOperationFailed(MasterProcedureEnv env, } @Override - public void complete(MasterProcedureEnv env, Throwable error) { - this.succ = true; - return; + public boolean complete(MasterProcedureEnv env, Throwable error) { + return true; } @Override