diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index f5da0aa0bde7..ff008c15c276 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2163,6 +2163,14 @@ default SyncReplicationState getReplicationPeerSyncReplicationState(String peerI return peers.get(0).getSyncReplicationState(); } + /** + * Check if a replication peer is enabled. + * @param peerId id of replication peer to check + * @return true if replication peer is enabled + * @throws IOException if a remote or network exception occurs + */ + boolean isReplicationPeerEnabled(String peerId) throws IOException; + /** * Mark region server(s) as decommissioned to prevent additional regions from getting assigned to * them. Optionally unload the regions on the servers. If there are multiple servers to be diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index 9e2b990d91c1..a199adc17c29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -876,6 +876,11 @@ public Future transitReplicationPeerSyncReplicationStateAsync(String peerI return admin.transitReplicationPeerSyncReplicationState(peerId, state); } + @Override + public boolean isReplicationPeerEnabled(String peerId) throws IOException { + return get(admin.isReplicationPeerEnabled(peerId)); + } + @Override public void decommissionRegionServers(List servers, boolean offload) throws IOException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 6070c553f5e1..680aa4cc87dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -795,6 +795,14 @@ CompletableFuture removeReplicationPeerTableCFs(String peerId, */ CompletableFuture disableTableReplication(TableName tableName); + /** + * Check if a replication peer is enabled. + * @param peerId id of replication peer to check + * @return true if replication peer is enabled. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture isReplicationPeerEnabled(String peerId); + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index a8f93dd506d4..fba883c1bbe7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -481,6 +481,11 @@ public CompletableFuture disableTableReplication(TableName tableName) { return wrap(rawAdmin.disableTableReplication(tableName)); } + @Override + public CompletableFuture isReplicationPeerEnabled(String peerId) { + return wrap(rawAdmin.isReplicationPeerEnabled(peerId)); + } + @Override public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4d614907326e..21cc1d17b0c1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -325,6 +325,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -3734,6 +3736,18 @@ private CompletableFuture setTableReplication(TableName tableName, boolean return future; } + @Override + public CompletableFuture isReplicationPeerEnabled(String peerId) { + GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder(); + request.setPeerId(peerId); + return this. newMasterCaller() + .action((controller, stub) -> this. call(controller, stub, request.build(), + (s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done), + resp -> resp.getIsEnabled())) + .call(); + } + @Override public CompletableFuture clearBlockCache(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 257abe8f11ca..f5d4a80f148e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -1106,6 +1106,9 @@ service MasterService { rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest) returns(TransitReplicationPeerSyncReplicationStateResponse); + rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest) + returns(GetReplicationPeerStateResponse); + /** Returns a list of ServerNames marked as decommissioned. */ rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest) returns(ListDecommissionedRegionServersResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto index 6619c9694a46..24e459b39781 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Replication.proto @@ -161,3 +161,10 @@ message TransitReplicationPeerSyncReplicationStateRequest { message TransitReplicationPeerSyncReplicationStateResponse { required uint64 proc_id = 1; } + +message GetReplicationPeerStateRequest { + required string peer_id = 1; +} +message GetReplicationPeerStateResponse { + required bool is_enabled = 1; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 4a490b1e127c..a37c9e35a450 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -417,6 +417,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest; @@ -2105,6 +2107,18 @@ public ListReplicationPeersResponse listReplicationPeers(RpcController controlle return response.build(); } + @Override + public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller, + GetReplicationPeerStateRequest request) throws ServiceException { + boolean isEnabled; + try { + isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId()); + } catch (ReplicationException ioe) { + throw new ServiceException(ioe); + } + return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build(); + } + @Override public ListDecommissionedRegionServersResponse listDecommissionedRegionServers( RpcController controller, ListDecommissionedRegionServersRequest request) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 0d4e11197cd1..06cf559d4923 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -271,6 +271,15 @@ private void setPeerState(String peerId, boolean enabled) throws ReplicationExce desc.getSyncReplicationState())); } + public boolean getPeerState(String peerId) throws ReplicationException { + ReplicationPeerDescription desc = peers.get(peerId); + if (desc != null) { + return desc.isEnabled(); + } else { + throw new ReplicationException("Replication Peer of " + peerId + " does not exist."); + } + } + public void enablePeer(String peerId) throws ReplicationException { setPeerState(peerId, true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 92f8e17ed594..3d9fa06d2e75 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -441,4 +442,19 @@ public void testReplicationInReplay() throws Exception { } } } + + /** + * Test for HBASE-27448 Add an admin method to get replication enabled state + */ + @Test + public void testGetReplicationPeerState() throws Exception { + + // Test disable replication peer + hbaseAdmin.disableReplicationPeer("2"); + assertFalse(hbaseAdmin.isReplicationPeerEnabled("2")); + + // Test enable replication peer + hbaseAdmin.enableReplicationPeer("2"); + assertTrue(hbaseAdmin.isReplicationPeerEnabled("2")); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 3c0658455f3a..37bef49b4912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -676,6 +676,11 @@ public Future transitReplicationPeerSyncReplicationStateAsync(String peerI return admin.transitReplicationPeerSyncReplicationStateAsync(peerId, state); } + @Override + public boolean isReplicationPeerEnabled(String peerId) throws IOException { + return admin.isReplicationPeerEnabled(peerId); + } + public void decommissionRegionServers(List servers, boolean offload) throws IOException { admin.decommissionRegionServers(servers, offload); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 13a1b9920ecf..0842497f9527 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -1031,6 +1031,11 @@ public Future transitReplicationPeerSyncReplicationStateAsync(String peerI "transitReplicationPeerSyncReplicationStateAsync not supported in ThriftAdmin"); } + @Override + public boolean isReplicationPeerEnabled(String peerId) throws IOException { + throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin"); + } + @Override public void decommissionRegionServers(List servers, boolean offload) { throw new NotImplementedException("decommissionRegionServers not supported in ThriftAdmin");