Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28420 persist the procedure's field to store on remoteProcedureDone for ServerRemoteProcedure #5857

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import "rpc/RPC.proto";
import "server/Snapshot.proto";
import "server/master/Replication.proto";
import "server/master/RegionServerStatus.proto";
import "server/ErrorHandling.proto";

// ============================================================================
// WARNING - Compatibility rules
Expand Down Expand Up @@ -254,6 +255,8 @@ message SnapshotVerifyProcedureStateData {
required SnapshotDescription snapshot = 1;
required RegionInfo region = 2;
optional ServerName target_server = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

message SnapshotVerifyParameter {
Expand Down Expand Up @@ -522,6 +525,8 @@ message RefreshPeerStateData {
required ServerName target_server = 3;
/** We need multiple stages for sync replication state transition **/
optional uint32 stage = 4 [default = 0];
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message RefreshPeerParameter {
Expand Down Expand Up @@ -613,6 +618,8 @@ message SyncReplicationReplayWALRemoteStateData {
required string peer_id = 1;
repeated string wal = 2;
required ServerName target_server = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

message ReplaySyncReplicationWALParameter {
Expand Down Expand Up @@ -650,6 +657,14 @@ enum RegionRemoteProcedureBaseState {
REGION_REMOTE_PROCEDURE_SERVER_CRASH = 4;
}

enum ServerRemoteProcedureState {
SERVER_REMOTE_PROCEDURE_DISPATCH = 1;
SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL = 2;
SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED = 3;
SERVER_REMOTE_PROCEDURE_REPORT_FAILED = 4;
SERVER_REMOTE_PROCEDURE_SERVER_CRASH = 5;
}

message RegionRemoteProcedureBaseStateData {
required RegionInfo region = 1;
required ServerName target_server = 2;
Expand Down Expand Up @@ -681,6 +696,8 @@ message SwitchRpcThrottleStateData {
message SwitchRpcThrottleRemoteStateData {
required ServerName target_server = 1;
required bool rpc_throttle_enabled = 2;
optional ServerRemoteProcedureState state = 3;
optional ForeignExceptionMessage error = 4;
}

message SplitWALParameter {
Expand All @@ -698,6 +715,8 @@ message SplitWALRemoteData{
required string wal_path = 1;
required ServerName crashed_server=2;
required ServerName worker = 3;
optional ServerRemoteProcedureState state = 4;
optional ForeignExceptionMessage error = 5;
}

enum SplitWALState{
Expand All @@ -715,6 +734,8 @@ message ClaimReplicationQueueRemoteStateData {
required string queue = 2;
required ServerName target_server = 3;
optional ServerName source_server = 4;
optional ServerRemoteProcedureState state = 5;
optional ForeignExceptionMessage error = 6;
}

message ClaimReplicationQueueRemoteParameter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

@InterfaceAudience.Private
/**
* The base class for Procedures that run {@link java.util.concurrent.Callable}s on a (remote)
Expand Down Expand Up @@ -80,17 +82,25 @@ public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv
protected ServerName targetServer;
protected boolean dispatched;
protected boolean succ;
// after remoteProcedureDone we require error field to decide the next state
protected Throwable remoteError;
protected MasterProcedureProtos.ServerRemoteProcedureState state =
MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;

protected abstract void complete(MasterProcedureEnv env, Throwable error);

@Override
protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
if (dispatched) {
if (
state != MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH
) {
complete(env, this.remoteError);
if (succ) {
return null;
}
dispatched = false;
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH;
;
}
try {
env.getRemoteDispatcher().addOperationToNode(targetServer, this);
Expand All @@ -113,17 +123,20 @@ protected synchronized void completionCleanup(MasterProcedureEnv env) {
@Override
public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
IOException exception) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_DISPATCH_FAIL;
remoteOperationDone(env, exception);
}

@Override
public synchronized void remoteOperationCompleted(MasterProcedureEnv env) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED;
remoteOperationDone(env, null);
}

@Override
public synchronized void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException error) {
state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_FAILED;
remoteOperationDone(env, error);
}

Expand All @@ -137,7 +150,9 @@ synchronized void remoteOperationDone(MasterProcedureEnv env, Throwable error) {
getProcId());
return;
}
complete(env, error);
this.remoteError = error;
// below persistence is added so that if report goes to last active master, it throws exception
env.getMasterServices().getMasterProcedureExecutor().getStore().update(this);
event.wake(env.getProcedureScheduler());
event = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
Expand Down Expand Up @@ -177,10 +179,15 @@ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SnapshotVerifyProcedureStateData.Builder builder =
SnapshotVerifyProcedureStateData.newBuilder();
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region));
builder.setSnapshot(snapshot).setRegion(ProtobufUtil.toRegionInfo(region)).setState(state);
if (targetServer != null) {
builder.setTargetServer(ProtobufUtil.toServerName(targetServer));
}
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -190,9 +197,13 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SnapshotVerifyProcedureStateData.class);
this.snapshot = data.getSnapshot();
this.region = ProtobufUtil.toRegionInfo(data.getRegion());
this.state = data.getState();
if (data.hasTargetServer()) {
this.targetServer = ProtobufUtil.toServerName(data.getTargetServer());
}
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.regionserver.SplitWALCallable;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;

/**
Expand Down Expand Up @@ -70,7 +72,12 @@ protected void serializeStateData(ProcedureStateSerializer serializer) throws IO
MasterProcedureProtos.SplitWALRemoteData.Builder builder =
MasterProcedureProtos.SplitWALRemoteData.newBuilder();
builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(targetServer))
.setCrashedServer(ProtobufUtil.toServerName(crashedServer));
.setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setState(state);
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

Expand All @@ -81,6 +88,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
walPath = data.getWalPath();
targetServer = ProtobufUtil.toServerName(data.getWorker());
crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
state = data.getState();
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
import org.apache.hadoop.hbase.replication.regionserver.SwitchRpcThrottleRemoteCallable;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleRemoteStateData;

/**
Expand Down Expand Up @@ -59,9 +61,16 @@ protected boolean abort(MasterProcedureEnv env) {

@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
SwitchRpcThrottleRemoteStateData.newBuilder()
.setTargetServer(ProtobufUtil.toServerName(targetServer))
.setRpcThrottleEnabled(rpcThrottleEnabled).build();
SwitchRpcThrottleRemoteStateData.Builder builder =
SwitchRpcThrottleRemoteStateData.newBuilder();
builder.setTargetServer(ProtobufUtil.toServerName(targetServer))
.setRpcThrottleEnabled(rpcThrottleEnabled).setState(state).build();
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.build());
}

@Override
Expand All @@ -70,6 +79,10 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
serializer.deserialize(SwitchRpcThrottleRemoteStateData.class);
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
rpcThrottleEnabled = data.getRpcThrottleEnabled();
state = data.getState();
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;

Expand Down Expand Up @@ -144,7 +146,13 @@ protected boolean waitInitialized(MasterProcedureEnv env) {
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData
.newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName()))
.setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer));
.setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer))
.setState(state);
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
queueId.getSourceServerName()
.ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer)));
serializer.serialize(builder.build());
Expand All @@ -157,11 +165,15 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
String queue = data.getQueue();
state = data.getState();
if (data.hasSourceServer()) {
queueId = new ReplicationQueueId(crashedServer, queue,
ProtobufUtil.toServerName(data.getSourceServer()));
} else {
queueId = new ReplicationQueueId(crashedServer, queue);
}
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
import org.apache.hadoop.hbase.replication.regionserver.RefreshPeerCallable;
import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ErrorHandlingProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData;
Expand Down Expand Up @@ -149,9 +151,15 @@ protected boolean waitInitialized(MasterProcedureEnv env) {

@Override
protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
serializer.serialize(
RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).build());
RefreshPeerStateData.Builder builder = RefreshPeerStateData.newBuilder();
if (this.remoteError != null) {
ErrorHandlingProtos.ForeignExceptionMessage fem =
ForeignExceptionUtil.toProtoForeignException(remoteError);
builder.setError(fem);
}
serializer.serialize(builder.setPeerId(peerId).setType(toPeerModificationType(type))
.setTargetServer(ProtobufUtil.toServerName(targetServer)).setStage(stage).setState(state)
.build());
}

@Override
Expand All @@ -161,5 +169,9 @@ protected void deserializeStateData(ProcedureStateSerializer serializer) throws
type = toPeerOperationType(data.getType());
targetServer = ProtobufUtil.toServerName(data.getTargetServer());
stage = data.getStage();
state = data.getState();
if (data.hasError()) {
this.remoteError = ForeignExceptionUtil.toException(data.getError());
}
}
}
Loading