Skip to content

Commit

Permalink
HBASE-27448 Add an admin method to get replication enabled state (apa…
Browse files Browse the repository at this point in the history
…che#4855)

Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
2005hithlj authored Oct 30, 2022
1 parent 984d226 commit dc4fa05
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,14 @@ default SyncReplicationState getReplicationPeerSyncReplicationState(String peerI
return peers.get(0).getSyncReplicationState();
}

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return <code>true</code> if replication peer is enabled
* @throws IOException if a remote or network exception occurs
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
return admin.transitReplicationPeerSyncReplicationState(peerId, state);
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return get(admin.isReplicationPeerEnabled(peerId));
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,14 @@ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
*/
CompletableFuture<Void> disableTableReplication(TableName tableName);

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);

/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ public CompletableFuture<Void> disableTableReplication(TableName tableName) {
return wrap(rawAdmin.disableTableReplication(tableName));
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}

@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
Expand Down Expand Up @@ -3734,6 +3736,18 @@ private CompletableFuture<Void> setTableReplication(TableName tableName, boolean
return future;
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
request.setPeerId(peerId);
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
resp -> resp.getIsEnabled()))
.call();
}

@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ service MasterService {
rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
returns(TransitReplicationPeerSyncReplicationStateResponse);

rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);

/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,10 @@ message TransitReplicationPeerSyncReplicationStateRequest {
message TransitReplicationPeerSyncReplicationStateResponse {
required uint64 proc_id = 1;
}

message GetReplicationPeerStateRequest {
required string peer_id = 1;
}
message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
Expand Down Expand Up @@ -2105,6 +2107,18 @@ public ListReplicationPeersResponse listReplicationPeers(RpcController controlle
return response.build();
}

@Override
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
GetReplicationPeerStateRequest request) throws ServiceException {
boolean isEnabled;
try {
isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
} catch (ReplicationException ioe) {
throw new ServiceException(ioe);
}
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}

@Override
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
RpcController controller, ListDecommissionedRegionServersRequest request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ private void setPeerState(String peerId, boolean enabled) throws ReplicationExce
desc.getSyncReplicationState()));
}

public boolean getPeerState(String peerId) throws ReplicationException {
ReplicationPeerDescription desc = peers.get(peerId);
if (desc != null) {
return desc.isEnabled();
} else {
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
}
}

public void enablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -441,4 +442,19 @@ public void testReplicationInReplay() throws Exception {
}
}
}

/**
* Test for HBASE-27448 Add an admin method to get replication enabled state
*/
@Test
public void testGetReplicationPeerState() throws Exception {

// Test disable replication peer
hbaseAdmin.disableReplicationPeer("2");
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));

// Test enable replication peer
hbaseAdmin.enableReplicationPeer("2");
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
return admin.transitReplicationPeerSyncReplicationStateAsync(peerId, state);
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return admin.isReplicationPeerEnabled(peerId);
}

public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
admin.decommissionRegionServers(servers, offload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
"transitReplicationPeerSyncReplicationStateAsync not supported in ThriftAdmin");
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload) {
throw new NotImplementedException("decommissionRegionServers not supported in ThriftAdmin");
Expand Down

0 comments on commit dc4fa05

Please sign in to comment.