Skip to content

Commit

Permalink
HBASE-27783 Implement a shell command to disable all peer modification (
Browse files Browse the repository at this point in the history
#5182)

Signed-off-by: Liangjun He <[email protected]>
  • Loading branch information
Apache9 authored Apr 19, 2023
1 parent 94a8f31 commit 398c5ef
Show file tree
Hide file tree
Showing 27 changed files with 694 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,34 @@ default SyncReplicationState getReplicationPeerSyncReplicationState(String peerI
*/
boolean isReplicationPeerEnabled(String peerId) throws IOException;

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state
*/
default boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state
*/
boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures) throws IOException;

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}
*/
boolean isReplicationPeerModificationEnabled() throws IOException;

/**
* Mark region server(s) as decommissioned to prevent additional regions from getting assigned to
* them. Optionally unload the regions on the servers. If there are multiple servers to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,17 @@ public boolean isReplicationPeerEnabled(String peerId) throws IOException {
return get(admin.isReplicationPeerEnabled(peerId));
}

@Override
public boolean replicationPeerModificationSwitch(boolean on, boolean drainProcedures)
throws IOException {
return get(admin.replicationPeerModificationSwitch(on, drainProcedures));
}

@Override
public boolean isReplicationPeerModificationEnabled() throws IOException {
return get(admin.isReplicationPeerModificationEnabled());
}

@Override
public void decommissionRegionServers(List<ServerName> servers, boolean offload)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,10 +815,39 @@ CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
* Check if a replication peer is enabled.
* @param peerId id of replication peer to check
* @return true if replication peer is enabled. The return value will be wrapped by a
* {@link CompletableFuture}.
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId);

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
default CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on) {
return replicationPeerModificationSwitch(on, false);
}

/**
* Enable or disable replication peer modification.
* <p/>
* This is especially useful when you want to change the replication peer storage.
* @param on {@code true} means enable, otherwise disable
* @param drainProcedures if {@code true}, will wait until all the running replication peer
* modification procedures finish
* @return the previous enable/disable state wrapped by a {@link CompletableFuture}
*/
CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on, boolean drainProcedures);

/**
* Check whether replication peer modification is enabled.
* @return {@code true} if modification is enabled, otherwise {@code false}, wrapped by a
* {@link CompletableFuture}
*/
CompletableFuture<Boolean> isReplicationPeerModificationEnabled();

/**
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are taken sequentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,17 @@ public CompletableFuture<Boolean> isReplicationPeerEnabled(String peerId) {
return wrap(rawAdmin.isReplicationPeerEnabled(peerId));
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
return wrap(rawAdmin.replicationPeerModificationSwitch(on, drainProcedures));
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return wrap(rawAdmin.isReplicationPeerModificationEnabled());
}

@Override
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaRegionSizesRequest;
Expand Down Expand Up @@ -329,12 +330,18 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.EnableReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerModificationProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.GetReplicationPeerStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.IsReplicationPeerModificationEnabledResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationPeerModificationSwitchResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.TransitReplicationPeerSyncReplicationStateResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
Expand Down Expand Up @@ -3774,6 +3781,74 @@ GetReplicationPeerStateResponse, Boolean> call(controller, stub, request.build()
.call();
}

private void waitUntilAllReplicationPeerModificationProceduresDone(
CompletableFuture<Boolean> future, boolean prevOn, int retries) {
CompletableFuture<List<ProcedureProtos.Procedure>> callFuture =
this.<List<ProcedureProtos.Procedure>> newMasterCaller()
.action((controller, stub) -> this.<GetReplicationPeerModificationProceduresRequest,
GetReplicationPeerModificationProceduresResponse, List<ProcedureProtos.Procedure>> call(
controller, stub, GetReplicationPeerModificationProceduresRequest.getDefaultInstance(),
(s, c, req, done) -> s.getReplicationPeerModificationProcedures(c, req, done),
resp -> resp.getProcedureList()))
.call();
addListener(callFuture, (r, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else if (r.isEmpty()) {
// we are done
future.complete(prevOn);
} else {
// retry later to see if the procedures are done
retryTimer.newTimeout(
t -> waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, retries + 1),
ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS);
}
});
}

@Override
public CompletableFuture<Boolean> replicationPeerModificationSwitch(boolean on,
boolean drainProcedures) {
ReplicationPeerModificationSwitchRequest request =
ReplicationPeerModificationSwitchRequest.newBuilder().setOn(on).build();
CompletableFuture<Boolean> callFuture = this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<ReplicationPeerModificationSwitchRequest,
ReplicationPeerModificationSwitchResponse, Boolean> call(controller, stub, request,
(s, c, req, done) -> s.replicationPeerModificationSwitch(c, req, done),
resp -> resp.getPreviousValue()))
.call();
// if we do not need to wait all previous peer modification procedure done, or we are enabling
// peer modification, just return here.
if (!drainProcedures || on) {
return callFuture;
}
// otherwise we need to wait until all previous peer modification procedure done
CompletableFuture<Boolean> future = new CompletableFuture<>();
addListener(callFuture, (prevOn, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
// even if the previous state is disabled, we still need to wait here, as there could be
// another client thread which called this method just before us and have already changed the
// state to off, but there are still peer modification procedures not finished, so we should
// also wait here.
waitUntilAllReplicationPeerModificationProceduresDone(future, prevOn, 0);
});
return future;
}

@Override
public CompletableFuture<Boolean> isReplicationPeerModificationEnabled() {
return this.<Boolean> newMasterCaller()
.action((controller, stub) -> this.<IsReplicationPeerModificationEnabledRequest,
IsReplicationPeerModificationEnabledResponse, Boolean> call(controller, stub,
IsReplicationPeerModificationEnabledRequest.getDefaultInstance(),
(s, c, req, done) -> s.isReplicationPeerModificationEnabled(c, req, done),
(resp) -> resp.getEnabled()))
.call();
}

@Override
public CompletableFuture<CacheEvictionStats> clearBlockCache(TableName tableName) {
CompletableFuture<CacheEvictionStats> future = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,15 @@ service MasterService {
rpc IsReplicationPeerEnabled(GetReplicationPeerStateRequest)
returns(GetReplicationPeerStateResponse);

rpc ReplicationPeerModificationSwitch(ReplicationPeerModificationSwitchRequest)
returns(ReplicationPeerModificationSwitchResponse);

rpc GetReplicationPeerModificationProcedures(GetReplicationPeerModificationProceduresRequest)
returns(GetReplicationPeerModificationProceduresResponse);

rpc IsReplicationPeerModificationEnabled(IsReplicationPeerModificationEnabledRequest)
returns(IsReplicationPeerModificationEnabledResponse);

/** Returns a list of ServerNames marked as decommissioned. */
rpc ListDecommissionedRegionServers(ListDecommissionedRegionServersRequest)
returns(ListDecommissionedRegionServersResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/Procedure.proto";

message TableCF {
optional TableName table_name = 1;
Expand Down Expand Up @@ -165,6 +166,33 @@ message TransitReplicationPeerSyncReplicationStateResponse {
message GetReplicationPeerStateRequest {
required string peer_id = 1;
}

message GetReplicationPeerStateResponse {
required bool is_enabled = 1;
}

message ReplicationPeerModificationSwitchRequest {
required bool on = 1;
}

message ReplicationPeerModificationSwitchResponse {
required bool previous_value = 1;
}

message ReplicationPeerModificationState {
required bool on = 1;
}

message GetReplicationPeerModificationProceduresRequest {
}

message GetReplicationPeerModificationProceduresResponse {
repeated Procedure procedure = 1;
}

message IsReplicationPeerModificationEnabledRequest {
}

message IsReplicationPeerModificationEnabledResponse {
required bool enabled = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ public boolean get() {
* Set the flag on/off.
* @param on true if the flag should be on, false otherwise
* @throws IOException if the operation fails
* @return returns the previous state
*/
public synchronized void set(boolean on) throws IOException {
public synchronized boolean set(boolean on) throws IOException {
byte[] state = toByteArray(on);
setState(state);
boolean prevOn = this.on;
this.on = on;
return prevOn;
}

protected abstract byte[] toByteArray(boolean on);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerModificationStateStore;
import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
Expand Down Expand Up @@ -468,6 +469,11 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste

private TaskGroup startupTaskGroup;

/**
* Store whether we allow replication peer modification operations.
*/
private ReplicationPeerModificationStateStore replicationPeerModificationStateStore;

/**
* Initializes the HMaster. The steps are as follows:
* <p>
Expand Down Expand Up @@ -785,6 +791,8 @@ private void initializeZKBasedSystemTrackers()

this.replicationPeerManager =
ReplicationPeerManager.create(fileSystemManager.getFileSystem(), zooKeeper, conf, clusterId);
this.replicationPeerModificationStateStore =
new ReplicationPeerModificationStateStore(masterRegion);

this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start();
Expand Down Expand Up @@ -3787,6 +3795,9 @@ public FavoredNodesManager getFavoredNodesManager() {
}

private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
if (!isReplicationPeerModificationEnabled()) {
throw new IOException("Replication peer modification disabled");
}
long procId = procedureExecutor.submitProcedure(procedure);
procedure.getLatch().await();
return procId;
Expand Down Expand Up @@ -3866,6 +3877,16 @@ public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplic
return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
}

@Override
public boolean replicationPeerModificationSwitch(boolean on) throws IOException {
return replicationPeerModificationStateStore.set(on);
}

@Override
public boolean isReplicationPeerModificationEnabled() {
return replicationPeerModificationStateStore.get();
}

/**
* Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
* regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
Expand Down Expand Up @@ -4290,5 +4311,4 @@ private void initializeCoprocessorHost(Configuration conf) {
// initialize master side coprocessors before we start handling requests
this.cpHost = new MasterCoprocessorHost(this, conf);
}

}
Loading

0 comments on commit 398c5ef

Please sign in to comment.