Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27783 Implement a shell command to disable all peer modification #5191

Merged
merged 2 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2975,6 +2975,34 @@ default void removeReplicationPeerTableCFs(String id, Map<TableName, List<String
*/
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state
*/
default boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state
*/
boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException;

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}
*/
boolean isReplicationPeerModificationEnabled() throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,39 @@ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
default CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on) {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, boolean drainProcedures);

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerModificationEnabled();

/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,17 @@ public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures));
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return wrap(rawAdmin.isReplicationPeerModificationEnabled());
}

@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;

Expand Down Expand Up @@ -2035,6 +2041,27 @@ public FlushMasterStoreResponse flushMasterStore(RpcController controller,
FlushMasterStoreRequest request) throws ServiceException {
return stub.flushMasterStore(controller, request);
}

@Override
public ReplicationPeerModificationSwitchResponse replicationPeerModificationSwitch(
RpcController controller, ReplicationPeerModificationSwitchRequest request)
throws ServiceException {
return stub.replicationPeerModificationSwitch(controller, request);
}

@Override
public GetReplicationPeerModificationProceduresResponse
getReplicationPeerModificationProcedures(RpcController controller,
GetReplicationPeerModificationProceduresRequest request) throws ServiceException {
return stub.getReplicationPeerModificationProcedures(controller, request);
}

@Override
public IsReplicationPeerModificationEnabledResponse isReplicationPeerModificationEnabled(
RpcController controller, IsReplicationPeerModificationEnabledRequest request)
throws ServiceException {
return stub.isReplicationPeerModificationEnabled(controller, request);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesResponse.RegionSizes;
Expand All @@ -235,9 +236,12 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;

Expand Down Expand Up @@ -4470,4 +4474,57 @@ protected Boolean rpcCall(int callTimeout) throws Exception {
}
});
}

@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
boolean prevOn =
executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
protected Boolean rpcCall() throws Exception {
return master.replicationPeerModificationSwitch(getRpcController(), request)
.getPreviousValue();
}
});
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return prevOn;
}
// otherwise we need to wait until all previous peer modification procedure done
for (int retry = 0;; retry++) {
List<ProcedureProtos.Procedure> procs =
executeCallable(new MasterCallable<List<ProcedureProtos.Procedure>>(getConnection(),
getRpcControllerFactory()) {
@Override
protected List<ProcedureProtos.Procedure> rpcCall() throws Exception {
return master
.getReplicationPeerModificationProcedures(getRpcController(),
GetReplicationPeerModificationProceduresRequest.getDefaultInstance())
.getProcedureList();
}
});
if (procs.isEmpty()) {
return prevOn;
}
try {
Thread.sleep(ConnectionUtils.getPauseTime(pause, retry));
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException().initCause(e);
}
}
}

@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
@Override
protected Boolean rpcCall() throws Exception {
return master.isReplicationPeerModificationEnabled(getRpcController(),
IsReplicationPeerModificationEnabledRequest.getDefaultInstance()).getEnabled();
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
Expand All @@ -299,12 +300,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
Expand Down Expand Up @@ -3708,6 +3715,74 @@ private CompletableFuture<Void> setTableReplication(TableName tableName, boolean
return future;
}

private void waitUntilAllReplicationPeerModificationProceduresDone(
CompletableFuture<Boolean> future, boolean prevOn, int retries) {
CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
this.<List<ProcedureProtos.Procedure>> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
(s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
resp -> resp.getProcedureList()))
.call();
addListener(callFuture, (r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else if (r.isEmpty()) {
// we are done
future.complete(prevOn);
} else {
// retry later to see if the procedures are done
retryTimer.newTimeout(
t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
}
});
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
(s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
resp -> resp.getPreviousValue()))
.call();
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return callFuture;
}
// otherwise we need to wait until all previous peer modification procedure done
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(callFuture, (prevOn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
// even if the previous state is disabled, we still need to wait here, as there could be
// another client thread which called this method just before us and have already changed the
// state to off, but there are still peer modification procedures not finished, so we should
// also wait here.
waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
});
return future;
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
(s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
(resp) -> resp.getEnabled()))
.call();
}

@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
Expand Down
Loading