From a386c41a74ccf5ea1de16befac786560219a098a Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 20 Apr 2023 00:54:17 +0800 Subject: [PATCH 1/2] HBASE-27783 Implement a shell command to disable all peer modification Signed-off-by: Liangjun He --- .../org/apache/hadoop/hbase/client/Admin.java | 28 ++++ .../hadoop/hbase/client/AsyncAdmin.java | 31 +++- .../hadoop/hbase/client/AsyncHBaseAdmin.java | 11 ++ .../client/ConnectionImplementation.java | 27 ++++ .../hadoop/hbase/client/HBaseAdmin.java | 57 +++++++ .../hbase/client/RawAsyncHBaseAdmin.java | 75 ++++++++++ .../client/ShortCircuitMasterConnection.java | 27 ++++ .../src/main/protobuf/Master.proto | 9 ++ .../src/main/protobuf/Replication.proto | 27 ++++ .../hbase/master/BooleanStateStore.java | 5 +- .../apache/hadoop/hbase/master/HMaster.java | 22 ++- .../hbase/master/MasterRpcServices.java | 57 +++++++ .../hadoop/hbase/master/MasterServices.java | 4 + .../hadoop/hbase/master/MasterStateStore.java | 4 + .../replication/AbstractPeerProcedure.java | 6 + .../replication/ModifyPeerProcedure.java | 1 + ...ReplicationPeerModificationStateStore.java | 64 ++++++++ .../hadoop/hbase/client/TestAdmin4.java | 30 +++- .../client/TestAsyncReplicationAdminApi.java | 24 ++- .../hbase/master/MockNoopMasterServices.java | 10 ++ .../TestDisablePeerModification.java | 139 ++++++++++++++++++ .../src/main/ruby/hbase/replication_admin.rb | 16 ++ hbase-shell/src/main/ruby/shell.rb | 2 + .../commands/peer_modification_enabled.rb | 40 +++++ .../commands/peer_modification_switch.rb | 46 ++++++ .../test/ruby/hbase/replication_admin_test.rb | 24 ++- .../hbase/thrift2/client/ThriftAdmin.java | 13 ++ 27 files changed, 787 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index e16c21eb2c33..e00f0c1ca176 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2975,6 +2975,34 @@ default void removeReplicationPeerTableCFs(String id, Map listReplicationPeers(Pattern pattern) throws IOException; + /** + * Enable or disable replication peer modification. + *

+ * This is especially useful when you want to change the replication peer storage. + * @param on {@code true} means enable, otherwise disable + * @return the previous enable/disable state + */ + default boolean replicationPeerModificationSwitch(boolean on) throws IOException { + return replicationPeerModificationSwitch(on, false); + } + + /** + * Enable or disable replication peer modification. + *

+ * This is especially useful when you want to change the replication peer storage. + * @param on {@code true} means enable, otherwise disable + * @param drainProcedures if {@code true}, will wait until all the running replication peer + * modification procedures finish + * @return the previous enable/disable state + */ + boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException; + + /** + * Check whether replication peer modification is enabled. + * @return {@code true} if modification is enabled, otherwise {@code false} + */ + boolean isReplicationPeerModificationEnabled() throws IOException; + /** * Mark region server(s) as decommissioned to prevent additional regions from getting assigned to * them. Optionally unload the regions on the servers. If there are multiple servers to be diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 361e967b8bcb..913350b1d172 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -793,10 +793,39 @@ CompletableFuture removeReplicationPeerTableCFs(String peerId, * Check if a replication peer is enabled. * @param peerId id of replication peer to check * @return true if replication peer is enabled. The return value will be wrapped by a - * {@link CompletableFuture}. + * {@link CompletableFuture} */ CompletableFuture isReplicationPeerEnabled(String peerId); + /** + * Enable or disable replication peer modification. + *

+ * This is especially useful when you want to change the replication peer storage. + * @param on {@code true} means enable, otherwise disable + * @return the previous enable/disable state wrapped by a {@link CompletableFuture} + */ + default CompletableFuture replicationPeerModificationSwitch(boolean on) { + return replicationPeerModificationSwitch(on, false); + } + + /** + * Enable or disable replication peer modification. + *

+ * This is especially useful when you want to change the replication peer storage. + * @param on {@code true} means enable, otherwise disable + * @param drainProcedures if {@code true}, will wait until all the running replication peer + * modification procedures finish + * @return the previous enable/disable state wrapped by a {@link CompletableFuture} + */ + CompletableFuture replicationPeerModificationSwitch(boolean on, boolean drainProcedures); + + /** + * Check whether replication peer modification is enabled. + * @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a + * {@link CompletableFuture} + */ + CompletableFuture isReplicationPeerModificationEnabled(); + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index bf5e2905c3b0..ce604d90b2ec 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -490,6 +490,17 @@ public CompletableFuture isReplicationPeerEnabled(String peerId) { return wrap(rawAdmin.isReplicationPeerEnabled(peerId)); } + @Override + public CompletableFuture replicationPeerModificationSwitch(boolean on, + boolean drainProcedures) { + return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures)); + } + + @Override + public CompletableFuture isReplicationPeerModificationEnabled() { + return wrap(rawAdmin.isReplicationPeerModificationEnabled()); + } + @Override public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 418916115b92..d3f8e36010a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -161,12 +161,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -2035,6 +2041,27 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller, FlushMasterStoreRequest request) throws ServiceException { return stub.flushMasterStore(controller, request); } + + @Override + public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch( + RpcController controller, ReplicationPeerModificationSwitchRequest request) + throws ServiceException { + return stub.replicationPeerModificationSwitch(controller, request); + } + + @Override + public GetReplicationPeerModificationProceduresResponse + getReplicationPeerModificationProcedures(RpcController controller, + GetReplicationPeerModificationProceduresRequest request) throws ServiceException { + return stub.getReplicationPeerModificationProcedures(controller, request); + } + + @Override + public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled( + RpcController controller, IsReplicationPeerModificationEnabledRequest request) + throws ServiceException { + return stub.isReplicationPeerModificationEnabled(controller, request); + } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 36c8304ecb94..27952e39e23d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -225,6 +225,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes; @@ -235,9 +236,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -4470,4 +4474,57 @@ protected Boolean rpcCall(int callTimeout) throws Exception { } }); } + + @Override + public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) + throws IOException { + ReplicationPeerModificationSwitchRequest request = + ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); + boolean prevOn = + executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return master.replicationPeerModificationSwitch(getRpcController(), request) + .getPreviousValue(); + } + }); + // if we do not need to wait all previous peer modification procedure done, or we are enabling + // peer modification, just return here. + if (!drainProcedures || on) { + return prevOn; + } + // otherwise we need to wait until all previous peer modification procedure done + for (int retry = 0;; retry++) { + List procs = + executeCallable(new MasterCallable>(getConnection(), + getRpcControllerFactory()) { + @Override + protected List rpcCall() throws Exception { + return master + .getReplicationPeerModificationProcedures(getRpcController(), + GetReplicationPeerModificationProceduresRequest.getDefaultInstance()) + .getProcedureList(); + } + }); + if (procs.isEmpty()) { + return prevOn; + } + try { + Thread.sleep(ConnectionUtils.getPauseTime(pause, retry)); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + + @Override + public boolean isReplicationPeerModificationEnabled() throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return master.isReplicationPeerModificationEnabled(getRpcController(), + IsReplicationPeerModificationEnabledRequest.getDefaultInstance()).getEnabled(); + } + }); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index e2187f8b4e8a..d413905c04af 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -284,6 +284,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest; @@ -299,12 +300,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos; @@ -3708,6 +3715,74 @@ private CompletableFuture setTableReplication(TableName tableName, boolean return future; } + private void waitUntilAllReplicationPeerModificationProceduresDone( + CompletableFuture future, boolean prevOn, int retries) { + CompletableFuture> callFuture = + this.> newMasterCaller() + .action((controller, stub) -> this.> call( + controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(), + (s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done), + resp -> resp.getProcedureList())) + .call(); + addListener(callFuture, (r, e) -> { + if (e != null) { + future.completeExceptionally(e); + } else if (r.isEmpty()) { + // we are done + future.complete(prevOn); + } else { + // retry later to see if the procedures are done + retryTimer.newTimeout( + t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1), + ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); + } + }); + } + + @Override + public CompletableFuture replicationPeerModificationSwitch(boolean on, + boolean drainProcedures) { + ReplicationPeerModificationSwitchRequest request = + ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build(); + CompletableFuture callFuture = this. newMasterCaller() + .action((controller, stub) -> this. call(controller, stub, request, + (s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done), + resp -> resp.getPreviousValue())) + .call(); + // if we do not need to wait all previous peer modification procedure done, or we are enabling + // peer modification, just return here. + if (!drainProcedures || on) { + return callFuture; + } + // otherwise we need to wait until all previous peer modification procedure done + CompletableFuture future = new CompletableFuture<>(); + addListener(callFuture, (prevOn, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + // even if the previous state is disabled, we still need to wait here, as there could be + // another client thread which called this method just before us and have already changed the + // state to off, but there are still peer modification procedures not finished, so we should + // also wait here. + waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0); + }); + return future; + } + + @Override + public CompletableFuture isReplicationPeerModificationEnabled() { + return this. newMasterCaller() + .action((controller, stub) -> this. call(controller, stub, + IsReplicationPeerModificationEnabledRequest.getDefaultInstance(), + (s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done), + (resp) -> resp.getEnabled())) + .call(); + } + @Override public CompletableFuture clearBlockCache(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 62b9cf16e9d4..a4b361b20915 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -195,12 +195,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -776,4 +782,25 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller, FlushMasterStoreRequest request) throws ServiceException { return stub.flushMasterStore(controller, request); } + + @Override + public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch( + RpcController controller, ReplicationPeerModificationSwitchRequest request) + throws ServiceException { + return stub.replicationPeerModificationSwitch(controller, request); + } + + @Override + public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures( + RpcController controller, GetReplicationPeerModificationProceduresRequest request) + throws ServiceException { + return stub.getReplicationPeerModificationProcedures(controller, request); + } + + @Override + public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled( + RpcController controller, IsReplicationPeerModificationEnabledRequest request) + throws ServiceException { + return stub.isReplicationPeerModificationEnabled(controller, request); + } } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 5e5a44621734..be59566d73af 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -1129,6 +1129,15 @@ service MasterService { rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest) returns(GetReplicationPeerStateResponse); + rpc ReplicationPeerModificationSwitch(ReplicationPeerModificationSwitchRequest) + returns(ReplicationPeerModificationSwitchResponse); + + rpc GetReplicationPeerModificationProcedures(GetReplicationPeerModificationProceduresRequest) + returns(GetReplicationPeerModificationProceduresResponse); + + rpc IsReplicationPeerModificationEnabled(IsReplicationPeerModificationEnabledRequest) + returns(IsReplicationPeerModificationEnabledResponse); + /** Returns a list of ServerNames marked as decommissioned. */ rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest) returns(ListDecommissionedRegionServersResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto index 8b4aa4813211..545e89eaaec7 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto @@ -26,6 +26,7 @@ option java_generate_equals_and_hash = true; option optimize_for = SPEED; import "HBase.proto"; +import "Procedure.proto"; message TableCF { optional TableName table_name = 1; @@ -145,3 +146,29 @@ message GetReplicationPeerStateRequest { message GetReplicationPeerStateResponse { required bool is_enabled = 1; } + +message ReplicationPeerModificationSwitchRequest { + required bool on = 1; +} + +message ReplicationPeerModificationSwitchResponse { + required bool previous_value = 1; +} + +message ReplicationPeerModificationState { + required bool on = 1; +} + +message GetReplicationPeerModificationProceduresRequest { +} + +message GetReplicationPeerModificationProceduresResponse { + repeated Procedure procedure = 1; +} + +message IsReplicationPeerModificationEnabledRequest { +} + +message IsReplicationPeerModificationEnabledResponse { + required bool enabled = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BooleanStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BooleanStateStore.java index df64844dbb1b..49a180633443 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BooleanStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BooleanStateStore.java @@ -52,11 +52,14 @@ public boolean get() { * Set the flag on/off. * @param on true if the flag should be on, false otherwise * @throws IOException if the operation fails + * @return returns the previous state */ - public synchronized void set(boolean on) throws IOException { + public synchronized boolean set(boolean on) throws IOException { byte[] state = toByteArray(on); setState(state); + boolean prevOn = this.on; this.on = on; + return prevOn; } protected abstract byte[] toByteArray(boolean on); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 7bb874f4654e..4abd7df915c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure; import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure; import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager; +import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; import org.apache.hadoop.hbase.master.snapshot.SnapshotCleanupStateStore; @@ -450,6 +451,11 @@ public class HMaster extends HRegionServer implements MasterServices { private TaskGroup startupTaskGroup; + /** + * Store whether we allow replication peer modification operations. + */ + private ReplicationPeerModificationStateStore replicationPeerModificationStateStore; + /** * Initializes the HMaster. The steps are as follows: *

@@ -764,6 +770,8 @@ private void initializeZKBasedSystemTrackers() this.replicationPeerManager = ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId); + this.replicationPeerModificationStateStore = + new ReplicationPeerModificationStateStore(masterRegion); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); @@ -3785,6 +3793,9 @@ public FavoredNodesManager getFavoredNodesManager() { } private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException { + if (!isReplicationPeerModificationEnabled()) { + throw new IOException("Replication peer modification disabled"); + } long procId = procedureExecutor.submitProcedure(procedure); procedure.getLatch().await(); return procId; @@ -3854,6 +3865,16 @@ public List listReplicationPeers(String regex) return peers; } + @Override + public boolean replicationPeerModificationSwitch(boolean on) throws IOException { + return replicationPeerModificationStateStore.set(on); + } + + @Override + public boolean isReplicationPeerModificationEnabled() { + return replicationPeerModificationStateStore.get(); + } + /** * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0 @@ -4232,5 +4253,4 @@ private void initializeCoprocessorHost(Configuration conf) { // initialize master side coprocessors before we start handling requests this.cpHost = new MasterCoprocessorHost(this, conf); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index f7d968d86276..8f74c2df7739 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; +import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; @@ -363,12 +364,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse; @@ -2173,6 +2180,56 @@ public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController co return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build(); } + @Override + public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch( + RpcController controller, ReplicationPeerModificationSwitchRequest request) + throws ServiceException { + try { + master.checkInitialized(); + boolean prevValue = master.replicationPeerModificationSwitch(request.getOn()); + return ReplicationPeerModificationSwitchResponse.newBuilder().setPreviousValue(prevValue) + .build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public GetReplicationPeerModificationProceduresResponse getReplicationPeerModificationProcedures( + RpcController controller, GetReplicationPeerModificationProceduresRequest request) + throws ServiceException { + try { + master.checkInitialized(); + GetReplicationPeerModificationProceduresResponse.Builder builder = + GetReplicationPeerModificationProceduresResponse.newBuilder(); + for (Procedure proc : master.getProcedures()) { + if (proc.isFinished()) { + continue; + } + if (!(proc instanceof AbstractPeerProcedure)) { + continue; + } + builder.addProcedure(ProcedureUtil.convertToProtoProcedure(proc)); + } + return builder.build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + + @Override + public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled( + RpcController controller, IsReplicationPeerModificationEnabledRequest request) + throws ServiceException { + try { + master.checkInitialized(); + return IsReplicationPeerModificationEnabledResponse.newBuilder() + .setEnabled(master.isReplicationPeerModificationEnabled()).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + @Override public ListDecommissionedRegionServersResponse listDecommissionedRegionServers( RpcController controller, ListDecommissionedRegionServersRequest request) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 43040472006f..279a3e1b48a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -373,6 +373,10 @@ long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig List listReplicationPeers(String regex) throws ReplicationException, IOException; + boolean replicationPeerModificationSwitch(boolean on) throws IOException; + + boolean isReplicationPeerModificationEnabled(); + /** Returns {@link LockManager} to lock namespaces/tables/regions. */ LockManager getLockManager(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java index b7aaa3e1b7b9..b18850083748 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterStateStore.java @@ -86,6 +86,10 @@ private byte[] migrate(ZKWatcher watcher, String zkPath) throws KeeperException, } private void tryMigrate(ZKWatcher watcher, String zkPath) throws IOException, KeeperException { + if (zkPath == null) { + // this means we do not store this state in zk, skip migrating + return; + } Result result = get(); if (result.isEmpty()) { // migrate diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java index 06cd7c9c248e..64896cae497c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java @@ -90,4 +90,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws super.deserializeStateData(serializer); peerId = serializer.deserialize(PeerProcedureStateData.class).getPeerId(); } + + protected final void checkPeerModificationEnabled(MasterProcedureEnv env) throws IOException { + if (!env.getMasterServices().isReplicationPeerModificationEnabled()) { + throw new IOException("Replication peer modification disabled"); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java index 8e5ec1425219..c5bbddfe7478 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -276,6 +276,7 @@ protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState st switch (state) { case PRE_PEER_MODIFICATION: try { + checkPeerModificationEnabled(env); prePeerModification(env); } catch (IOException e) { LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java new file mode 100644 index 000000000000..f68d5e41d5ba --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerModificationStateStore.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.master.BooleanStateStore; +import org.apache.hadoop.hbase.master.region.MasterRegion; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + +/** + * Store the peer modification state. + */ +@InterfaceAudience.Private +public class ReplicationPeerModificationStateStore extends BooleanStateStore { + + public static final String STATE_NAME = "replication_peer_modification_on"; + + public ReplicationPeerModificationStateStore(MasterRegion masterRegion) + throws DeserializationException, IOException, KeeperException { + super(masterRegion, STATE_NAME, null, null); + } + + @Override + protected byte[] toByteArray(boolean on) { + ReplicationProtos.ReplicationPeerModificationState.Builder builder = + ReplicationProtos.ReplicationPeerModificationState.newBuilder(); + builder.setOn(on); + return ProtobufUtil.prependPBMagic(builder.build().toByteArray()); + } + + @Override + protected boolean parseFrom(byte[] bytes) throws DeserializationException { + ProtobufUtil.expectPBMagicPrefix(bytes); + ReplicationProtos.ReplicationPeerModificationState.Builder builder = + ReplicationProtos.ReplicationPeerModificationState.newBuilder(); + try { + int magicLen = ProtobufUtil.lengthOfPBMagic(); + ProtobufUtil.mergeFrom(builder, bytes, magicLen, bytes.length - magicLen); + } catch (IOException e) { + throw new DeserializationException(e); + } + return builder.build().getOn(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java index 76321f26abff..e52d8ee92c3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin4.java @@ -17,12 +17,20 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -61,4 +69,24 @@ public void testDecommissionAndStopRegionServers() throws Exception { assertEquals(-1, ZKUtil.checkExists(zkw, ZNodePaths.joinZNode(zkw.getZNodePaths().drainingZNode, serverName.getServerName()))); } + + @Test + public void testReplicationPeerModificationSwitch() throws Exception { + assertTrue(ADMIN.isReplicationPeerModificationEnabled()); + try { + // disable modification, should returns true as it is enabled by default and the above + // assertion has confirmed it + assertTrue(ADMIN.replicationPeerModificationSwitch(false)); + IOException error = + assertThrows(IOException.class, () -> ADMIN.addReplicationPeer("peer", ReplicationPeerConfig + .newBuilder().setClusterKey(TEST_UTIL.getClusterKey() + "-test").build())); + assertThat(error.getCause().getMessage(), + containsString("Replication peer modification disabled")); + // enable again, and the previous value should be false + assertFalse(ADMIN.replicationPeerModificationSwitch(true)); + } finally { + // always reset to avoid mess up other tests + ADMIN.replicationPeerModificationSwitch(true); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 59d9e1d8dec8..36d50dd47a9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -18,13 +18,15 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; -import static org.hamcrest.CoreMatchers.instanceOf; -import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -105,6 +107,7 @@ public void clearPeerAndQueues() throws IOException, ReplicationException { queueStorage.removeQueue(serverName, queue); } } + admin.replicationPeerModificationSwitch(true).join(); } @Test @@ -519,7 +522,7 @@ public void testSetReplicationEndpoint() throws InterruptedException, ExecutionE } } - /* + /** * Tests that admin api throws ReplicationPeerNotFoundException if peer doesn't exist. */ @Test @@ -532,4 +535,19 @@ public void testReplicationPeerNotFoundException() throws InterruptedException { assertThat(e.getCause(), instanceOf(ReplicationPeerNotFoundException.class)); } } + + @Test + public void testReplicationPeerModificationSwitch() throws Exception { + assertTrue(admin.isReplicationPeerModificationEnabled().get()); + // disable modification, should returns true as it is enabled by default and the above + // assertion has confirmed it + assertTrue(admin.replicationPeerModificationSwitch(false).get()); + ExecutionException error = assertThrows(ExecutionException.class, () -> admin + .addReplicationPeer(ID_ONE, ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build()) + .get()); + assertThat(error.getCause().getMessage(), + containsString("Replication peer modification disabled")); + // enable again, and the previous value should be false + assertFalse(admin.replicationPeerModificationSwitch(true).get()); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 1150c4eb16a9..7d9f0cf47072 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -495,4 +495,14 @@ public long modifyColumnStoreFileTracker(TableName tableName, byte[] family, Str long nonceGroup, long nonce) throws IOException { return -1; } + + @Override + public boolean replicationPeerModificationSwitch(boolean on) throws IOException { + return false; + } + + @Override + public boolean isReplicationPeerModificationEnabled() { + return false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java new file mode 100644 index 000000000000..a10db2e16262 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.DummyReplicationEndpoint; +import org.apache.hadoop.hbase.replication.FSReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestDisablePeerModification { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDisablePeerModification.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static CountDownLatch ARRIVE = new CountDownLatch(1); + + private static CountDownLatch RESUME = new CountDownLatch(1); + + public static final class MockPeerStorage extends FSReplicationPeerStorage { + + public MockPeerStorage(FileSystem fs, Configuration conf) throws IOException { + super(fs, conf); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + ARRIVE.countDown(); + try { + RESUME.await(); + } catch (InterruptedException e) { + throw new ReplicationException(e); + } + super.addPeer(peerId, peerConfig, enabled); + } + } + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, + MockPeerStorage.class, ReplicationPeerStorage.class); + UTIL.startMiniCluster(1); + CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws IOException { + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + @Test + public void testDrainProcs() throws Exception { + AsyncAdmin admin = CONN.getAdmin(); + ReplicationPeerConfig rpc = + ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") + .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); + CompletableFuture addFuture = admin.addReplicationPeer("test_peer", rpc); + ARRIVE.await(); + + // we have a pending add peer procedure which has already passed the first state, let's issue a + // peer modification switch request to disable peer modification and set drainProcs to true + CompletableFuture switchFuture = admin.replicationPeerModificationSwitch(false, true); + + // sleep a while, the switchFuture should not finish yet + // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to + // make sure we have already changed the flag at master side, sleep a while is the most suitable + // way here + Thread.sleep(5000); + assertFalse(switchFuture.isDone()); + + // also verify that we can not schedule a new peer modification procedure + AddPeerProcedure proc = new AddPeerProcedure("failure", rpc, true); + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().submitProcedure(proc); + UTIL.waitFor(15000, () -> proc.isFinished()); + // make sure the procedure is failed because of peer modification disabled + assertTrue(proc.isFailed()); + assertThat(proc.getException().getCause().getMessage(), + containsString("Replication peer modification disabled")); + + // sleep a while and check again, make sure the switchFuture is still not done + Thread.sleep(5000); + assertFalse(switchFuture.isDone()); + + // resume the add peer procedure and wait it done + RESUME.countDown(); + addFuture.get(); + + // this time the switchFuture should be able to finish + assertTrue(switchFuture.get()); + } +} diff --git a/hbase-shell/src/main/ruby/hbase/replication_admin.rb b/hbase-shell/src/main/ruby/hbase/replication_admin.rb index 70ae4eae72a6..dfe263bf7b6c 100644 --- a/hbase-shell/src/main/ruby/hbase/replication_admin.rb +++ b/hbase-shell/src/main/ruby/hbase/replication_admin.rb @@ -461,5 +461,21 @@ def update_peer_config(id, args = {}) @admin.updateReplicationPeerConfig(id, builder.build) end + + #---------------------------------------------------------------------------------------------- + # Enable/disable replication peer modification + # Returns previous switch setting. + def peer_modification_switch(enable_or_disable, drain_procs) + @admin.replicationPeerModificationSwitch( + java.lang.Boolean.valueOf(enable_or_disable), java.lang.Boolean.valueOf(drain_procs) + ) + end + + #---------------------------------------------------------------------------------------------- + # Query whether replication peer modification is enabled. + # Returns whether replication peer modification is enabled (true is enabled). + def peer_modification_enabled? + @admin.isReplicationPeerModificationEnabled + end end end diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index adb253725404..e0a29b320828 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -525,6 +525,8 @@ def self.exception_handler(hide_traceback) get_peer_config list_peer_configs update_peer_config + peer_modification_enabled + peer_modification_switch ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb b/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb new file mode 100644 index 000000000000..dd12b5ba9ac7 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/peer_modification_enabled.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# frozen_string_literal: true + +module Shell + module Commands + # Prints whether peer modification operations are enabled + class PeerModificationEnabled < Command + def help + <<~EOF +Query whether peer modification operations are enabled +Examples: + + hbase> peer_modification_enabled +EOF + end + + def command + state = replication_admin.peer_modification_enabled? + formatter.row([state.to_s]) + state + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb b/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb new file mode 100644 index 000000000000..45ae167c9655 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/peer_modification_switch.rb @@ -0,0 +1,46 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# frozen_string_literal: true + +module Shell + module Commands + # Enable or disable peer modification operations + class PeerModificationSwitch < Command + def help + <<~EOF +Enable/Disable peer modification. Returns previous state. +Examples: + + hbase> peer_modification_switch true + hbase> peer_modification_switch false, true + +The second boolean parameter means whether you want to wait until all remaining peer modification +finished, before the command returns. +EOF + end + + def command(enable_or_disable, drain_procs = false) + prev_state = !!replication_admin.peer_modification_switch(enable_or_disable, drain_procs) + formatter.row(["Previous peer modification state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb index ca2187dccdfe..94873de8d6be 100644 --- a/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb +++ b/hbase-shell/src/test/ruby/hbase/replication_admin_test.rb @@ -584,7 +584,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) assert_equal(0, command(:list_peers).length) end - define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do + define_test 'set_peer_bandwidth: works with peer bandwidth upper limit' do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) @@ -599,7 +599,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end - define_test "get_peer_config: works with simple clusterKey peer" do + define_test 'get_peer_config: works with simple clusterKey peer' do cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) @@ -643,7 +643,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, peer_id_second) end - define_test "update_peer_config: can update peer config and data" do + define_test 'update_peer_config: can update peer config and data' do config_params = { "config1" => "value1", "config2" => "value2" } data_params = {"data1" => "value1", "data2" => "value2"} args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params} @@ -664,7 +664,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2")))) end - define_test "append_peer_exclude_namespaces: works with namespaces array" do + define_test 'append_peer_exclude_namespaces: works with namespaces array' do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) @@ -700,7 +700,7 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end - define_test "remove_peer_exclude_namespaces: works with namespaces array" do + define_test 'remove_peer_exclude_namespaces: works with namespaces array' do cluster_key = "zk4,zk5,zk6:11000:/hbase-test" args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint} command(:add_peer, @peer_id, args) @@ -738,6 +738,20 @@ def assert_tablecfs_equal(table_cfs, table_cfs_map) command(:remove_peer, @peer_id) end + define_test 'peer_modification_switch' do + command(:peer_modification_switch, true) + output = capture_stdout { command(:peer_modification_enabled) } + assert(output.include?('true')) + + output = capture_stdout { command(:peer_modification_switch, false, true) } + assert(output.include?('true')) + output = capture_stdout { command(:peer_modification_enabled) } + assert(output.include?('false')) + + output = capture_stdout { command(:peer_modification_switch, true) } + assert(output.include?('false')) + end + # assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279 # Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below. # define_test "add_peer: adding a second peer with same id should error" do diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 60cee0945dd8..4ba7ad3101fd 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -1481,4 +1481,17 @@ public Future modifyTableStoreFileTrackerAsync(TableName tableName, String throw new NotImplementedException( "modifyTableStoreFileTrackerAsync not supported in ThriftAdmin"); } + + @Override + public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) + throws IOException { + throw new NotImplementedException( + "replicationPeerModificationSwitch not supported in ThriftAdmin"); + } + + @Override + public boolean isReplicationPeerModificationEnabled() throws IOException { + throw new NotImplementedException( + "isReplicationPeerModificationEnabled not supported in ThriftAdmin"); + } } From 16f1fc92df49d517f004c4226c8cee15f8415b7d Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 20 Apr 2023 15:11:22 +0800 Subject: [PATCH 2/2] also test the method in Admin --- .../TestDisablePeerModification.java | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java index a10db2e16262..dbae305e07ff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDisablePeerModification.java @@ -23,8 +23,11 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ForkJoinPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -41,13 +44,19 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; +@RunWith(Parameterized.class) @Category({ MasterTests.class, LargeTests.class }) public class TestDisablePeerModification { @@ -57,9 +66,9 @@ public class TestDisablePeerModification { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private static CountDownLatch ARRIVE = new CountDownLatch(1); + private static volatile CountDownLatch ARRIVE; - private static CountDownLatch RESUME = new CountDownLatch(1); + private static volatile CountDownLatch RESUME; public static final class MockPeerStorage extends FSReplicationPeerStorage { @@ -82,6 +91,14 @@ public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean ena private static AsyncConnection CONN; + @Parameter + public boolean async; + + @Parameters(name = "{index}: async={0}") + public static List params() { + return Arrays.asList(new Object[] { true }, new Object[] { false }); + } + @BeforeClass public static void setUp() throws Exception { UTIL.getConfiguration().setClass(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, @@ -96,18 +113,37 @@ public static void tearDown() throws IOException { UTIL.shutdownMiniCluster(); } + @Before + public void setUpBeforeTest() throws IOException { + UTIL.getAdmin().replicationPeerModificationSwitch(true, true); + } + @Test public void testDrainProcs() throws Exception { + ARRIVE = new CountDownLatch(1); + RESUME = new CountDownLatch(1); AsyncAdmin admin = CONN.getAdmin(); ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test") .setReplicationEndpointImpl(DummyReplicationEndpoint.class.getName()).build(); - CompletableFuture addFuture = admin.addReplicationPeer("test_peer", rpc); + CompletableFuture addFuture = admin.addReplicationPeer("test_peer_" + async, rpc); ARRIVE.await(); // we have a pending add peer procedure which has already passed the first state, let's issue a // peer modification switch request to disable peer modification and set drainProcs to true - CompletableFuture switchFuture = admin.replicationPeerModificationSwitch(false, true); + CompletableFuture switchFuture; + if (async) { + switchFuture = admin.replicationPeerModificationSwitch(false, true); + } else { + switchFuture = new CompletableFuture<>(); + ForkJoinPool.commonPool().submit(() -> { + try { + switchFuture.complete(UTIL.getAdmin().replicationPeerModificationSwitch(false, true)); + } catch (IOException e) { + switchFuture.completeExceptionally(e); + } + }); + } // sleep a while, the switchFuture should not finish yet // the sleep is necessary as we can not join on the switchFuture, so there is no stable way to