Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27448 Add an admin method to get replication enabled state #4855

Merged
merged 4 commits into from
Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2163,6 +2163,14 @@ default SyncReplicationState getReplicationPeerSyncReplicationState(String peerI
return peers.get(0).getSyncReplicationState();
}

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return <code>true</code> if replication peer is enabled
* @throws IOException if a remote or network exception occurs
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
return admin.transitReplicationPeerSyncReplicationState(peerId, state);
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return get(admin.isReplicationPeerEnabled(peerId));
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,14 @@ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
*/
CompletableFuture<Void> disableTableReplication(TableName tableName);

/**
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);

/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ public CompletableFuture<Void> disableTableReplication(TableName tableName) {
return wrap(rawAdmin.disableTableReplication(tableName));
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}

@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
Expand Down Expand Up @@ -3734,6 +3736,18 @@ private CompletableFuture<Void> setTableReplication(TableName tableName, boolean
return future;
}

@Override
public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
GetReplicationPeerStateRequest.Builder request = GetReplicationPeerStateRequest.newBuilder();
request.setPeerId(peerId);
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerStateRequest,
GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build(),
(s, c, req, done) -> s.isReplicationPeerEnabled(c, req, done),
resp -> resp.getIsEnabled()))
.call();
}

@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ service MasterService {
rpc TransitReplicationPeerSyncReplicationState(TransitReplicationPeerSyncReplicationStateRequest)
returns(TransitReplicationPeerSyncReplicationStateResponse);

rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);

/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,10 @@ message TransitReplicationPeerSyncReplicationStateRequest {
message TransitReplicationPeerSyncReplicationStateResponse {
required uint64 proc_id = 1;
}

message GetReplicationPeerStateRequest {
required string peer_id = 1;
}
message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
Expand Down Expand Up @@ -2105,6 +2107,18 @@ public ListReplicationPeersResponse listReplicationPeers(RpcController controlle
return response.build();
}

@Override
public GetReplicationPeerStateResponse isReplicationPeerEnabled(RpcController controller,
GetReplicationPeerStateRequest request) throws ServiceException {
boolean isEnabled;
try {
isEnabled = server.getReplicationPeerManager().getPeerState(request.getPeerId());
} catch (ReplicationException ioe) {
throw new ServiceException(ioe);
}
return GetReplicationPeerStateResponse.newBuilder().setIsEnabled(isEnabled).build();
}

@Override
public ListDecommissionedRegionServersResponse listDecommissionedRegionServers(
RpcController controller, ListDecommissionedRegionServersRequest request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ private void setPeerState(String peerId, boolean enabled) throws ReplicationExce
desc.getSyncReplicationState()));
}

public boolean getPeerState(String peerId) throws ReplicationException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add a method to get the ReplicationPeerDescription? Anyway, not a big problem since this class is IA.Private.

ReplicationPeerDescription desc = peers.get(peerId);
if (desc != null) {
return desc.isEnabled();
} else {
throw new ReplicationException("Replication Peer of " + peerId + " does not exist.");
}
}

public void enablePeer(String peerId) throws ReplicationException {
setPeerState(peerId, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -441,4 +442,19 @@ public void testReplicationInReplay() throws Exception {
}
}
}

/**
* Test for HBASE-27448 Add an admin method to get replication enabled state
*/
@Test
public void testGetReplicationPeerState() throws Exception {

// Test disable replication peer
hbaseAdmin.disableReplicationPeer("2");
assertFalse(hbaseAdmin.isReplicationPeerEnabled("2"));

// Test enable replication peer
hbaseAdmin.enableReplicationPeer("2");
assertTrue(hbaseAdmin.isReplicationPeerEnabled("2"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
return admin.transitReplicationPeerSyncReplicationStateAsync(peerId, state);
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return admin.isReplicationPeerEnabled(peerId);
}

public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
admin.decommissionRegionServers(servers, offload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ public Future<Void> transitReplicationPeerSyncReplicationStateAsync(String peerI
"transitReplicationPeerSyncReplicationStateAsync not supported in ThriftAdmin");
}

@Override
public boolean isReplicationPeerEnabled(String peerId) throws IOException {
throw new NotImplementedException("isReplicationPeerEnabled not supported in ThriftAdmin");
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload) {
throw new NotImplementedException("decommissionRegionServers not supported in ThriftAdmin");
Expand Down