Skip to content

Commit

Permalink
HBASE-27783 Implement a shell command to disable all peer modification (
Browse files Browse the repository at this point in the history
#5182)(#5191)

Signed-off-by: Liangjun He <[email protected]>
  • Loading branch information
Apache9 authored Apr 21, 2023
1 parent 2ad3a80 commit 92a2868
Show file tree
Hide file tree
Showing 27 changed files with 823 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2975,6 +2975,34 @@ default void removeReplicationPeerTableCFs(String id, Map<TableName, List<String
*/
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state
*/
default boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state
*/
boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException;

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}
*/
boolean isReplicationPeerModificationEnabled() throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,39 @@ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
default CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on) {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, boolean drainProcedures);

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerModificationEnabled();

/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,17 @@ public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures));
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return wrap(rawAdmin.isReplicationPeerModificationEnabled());
}

@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;

Expand Down Expand Up @@ -2035,6 +2041,27 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
return stub.flushMasterStore(controller, request);
}

@Override
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
RpcController controller, ReplicationPeerModificationSwitchRequest request)
throws ServiceException {
return stub.replicationPeerModificationSwitch(controller, request);
}

@Override
public GetReplicationPeerModificationProceduresResponse
getReplicationPeerModificationProcedures(RpcController controller,
GetReplicationPeerModificationProceduresRequest request) throws ServiceException {
return stub.getReplicationPeerModificationProcedures(controller, request);
}

@Override
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
throws ServiceException {
return stub.isReplicationPeerModificationEnabled(controller, request);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
Expand All @@ -235,9 +236,12 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;

Expand Down Expand Up @@ -4470,4 +4474,57 @@ protected Boolean rpcCall(int callTimeout) throws Exception {
}
});
}

@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
boolean prevOn =
executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
protected Boolean rpcCall() throws Exception {
return master.replicationPeerModificationSwitch(getRpcController(), request)
.getPreviousValue();
}
});
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return prevOn;
}
// otherwise we need to wait until all previous peer modification procedure done
for (int retry = 0;; retry++) {
List<ProcedureProtos.Procedure> procs =
executeCallable(new MasterCallable<List<ProcedureProtos.Procedure>>(getConnection(),
getRpcControllerFactory()) {
@Override
protected List<ProcedureProtos.Procedure> rpcCall() throws Exception {
return master
.getReplicationPeerModificationProcedures(getRpcController(),
GetReplicationPeerModificationProceduresRequest.getDefaultInstance())
.getProcedureList();
}
});
if (procs.isEmpty()) {
return prevOn;
}
try {
Thread.sleep(ConnectionUtils.getPauseTime(pause, retry));
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
}

@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
protected Boolean rpcCall() throws Exception {
return master.isReplicationPeerModificationEnabled(getRpcController(),
IsReplicationPeerModificationEnabledRequest.getDefaultInstance()).getEnabled();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
Expand All @@ -299,12 +300,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
Expand Down Expand Up @@ -3708,6 +3715,74 @@ private CompletableFuture<Void> setTableReplication(TableName tableName, boolean
return future;
}

private void waitUntilAllReplicationPeerModificationProceduresDone(
CompletableFuture<Boolean> future, boolean prevOn, int retries) {
CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
this.<List<ProcedureProtos.Procedure>> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
(s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
resp -> resp.getProcedureList()))
.call();
addListener(callFuture, (r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else if (r.isEmpty()) {
// we are done
future.complete(prevOn);
} else {
// retry later to see if the procedures are done
retryTimer.newTimeout(
t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
}
});
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
(s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
resp -> resp.getPreviousValue()))
.call();
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return callFuture;
}
// otherwise we need to wait until all previous peer modification procedure done
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(callFuture, (prevOn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
// even if the previous state is disabled, we still need to wait here, as there could be
// another client thread which called this method just before us and have already changed the
// state to off, but there are still peer modification procedures not finished, so we should
// also wait here.
waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
});
return future;
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
(s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
(resp) -> resp.getEnabled()))
.call();
}

@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
Expand Down
Loading

0 comments on commit 92a2868

Please sign in to comment.