Skip to content

Commit

Permalink
using a optional field to introduce the snapshot procedure to the client
Browse files Browse the repository at this point in the history
  • Loading branch information
huiruan committed Dec 25, 2021
1 parent e99e3cb commit b0f826b
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1542,66 +1542,6 @@ default void snapshot(String snapshotName, TableName tableName,
void snapshot(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException;

/**
* Take a snapshot and wait for the server to complete that snapshot (blocking). It's same as
* {@link org.apache.hadoop.hbase.client.Admin#snapshot(String, TableName)} for users. The
* difference between the two methods is that
* {@link org.apache.hadoop.hbase.client.Admin#snapshotTable(String, TableName)} is based on
* proc-v2.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName)
throws IOException, SnapshotCreationException, IllegalArgumentException {
snapshotTable(snapshotName, tableName, SnapshotType.FLUSH);
}

/**
* Create typed snapshot of the table.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName, SnapshotType type)
throws IOException, SnapshotCreationException, IllegalArgumentException {
snapshotTable(new SnapshotDescription(snapshotName, tableName, type));
}

/**
* Create typed snapshot of the table.
* @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other
* snapshots stored on the cluster
* @param tableName name of the table to snapshot
* @param type type of snapshot to take
* @param snapshotProps snapshot additional properties e.g. TTL
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
default void snapshotTable(String snapshotName, TableName tableName,
SnapshotType type, Map<String, Object> snapshotProps)
throws IOException, SnapshotCreationException, IllegalArgumentException {
snapshot(new SnapshotDescription(snapshotName, tableName, type, snapshotProps));
}

/**
* Take a snapshot and wait for the server to complete that snapshot (blocking).
* @param snapshot snapshot to take
* @throws IOException we fail to reach the master
* @throws SnapshotCreationException if snapshot creation failed
* @throws IllegalArgumentException if the snapshot request is formatted incorrectly
*/
void snapshotTable(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException;

/**
* Take a snapshot without waiting for the server to complete that snapshot (asynchronous).
* Snapshots are considered unique based on <b>the name of the snapshot</b>. Snapshots are taken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,6 @@ public void snapshot(SnapshotDescription snapshot)
get(admin.snapshot(snapshot));
}

@Override
public void snapshotTable(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException, IllegalArgumentException {
get(admin.snapshotTable(snapshot));
}

@Override
public Future<Void> snapshotAsync(SnapshotDescription snapshot)
throws IOException, SnapshotCreationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,6 @@ default CompletableFuture<Void> snapshot(String snapshotName, TableName tableNam
*/
CompletableFuture<Void> snapshot(SnapshotDescription snapshot);

/**
* Take a snapshot and wait for the server to complete that snapshot asynchronously.
*/
default CompletableFuture<Void> snapshotTable(String snapshotName, TableName tableName) {
return snapshot(new SnapshotDescription(snapshotName, tableName, SnapshotType.FLUSH));
}

default CompletableFuture<Void> snapshotTable(String snapshotName, TableName tableName,
SnapshotType type) {
return snapshot(new SnapshotDescription(snapshotName, tableName, type));
}

CompletableFuture<Void> snapshotTable(SnapshotDescription snapshot);

/**
* Check the current state of the passed snapshot. There are three possible states:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,6 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshot(snapshot));
}

@Override
public CompletableFuture<Void> snapshotTable(SnapshotDescription snapshot) {
return wrap(rawAdmin.snapshotTable(snapshot));
}

@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return wrap(rawAdmin.isSnapshotFinished(snapshot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
Expand Down Expand Up @@ -1873,68 +1871,58 @@ public CompletableFuture<Void> snapshot(SnapshotDescription snapshotDesc) {
return failedFuture(e);
}
CompletableFuture<Void> future = new CompletableFuture<>();
final SnapshotRequest request = SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
addListener(this.<Long> newMasterCaller()
.action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse, Long> call(controller,
stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
resp -> resp.getExpectedTimeout()))
.call(), (expectedTimeout, err) -> {
final SnapshotRequest request =
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
.setNonce(ng.newNonce()).build();
addListener(this.<SnapshotResponse> newMasterCaller()
.action((controller, stub) ->
this.<SnapshotRequest, SnapshotResponse, SnapshotResponse> call(controller, stub,
request, (s, c, req, done) -> s.snapshot(c, req, done), resp -> resp))
.call(), (resp, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime =
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(
new SnapshotCreationException("Snapshot '" + snapshot.getName() +
"' wasn't completed in expectedTime:" + expectedTimeout + " ms", snapshotDesc));
if (resp.hasProcId()) {
getProcedureResult(resp.getProcId(), future, 0);
addListener(future, new SnapshotProcedureBiConsumer(snapshotDesc.getTableName()));
} else {
long expectedTimeout = resp.getExpectedTimeout();
TimerTask pollingTask = new TimerTask() {
int tries = 0;
long startTime = EnvironmentEdgeManager.currentTime();
long endTime = startTime + expectedTimeout;
long maxPauseTime = expectedTimeout / maxAttempts;

@Override
public void run(Timeout timeout) throws Exception {
if (EnvironmentEdgeManager.currentTime() < endTime) {
addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else if (done) {
future.complete(null);
} else {
// retry again after pauseTime.
long pauseTime = ConnectionUtils
.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
pauseTime = Math.min(pauseTime, maxPauseTime);
AsyncConnectionImpl.RETRY_TIMER
.newTimeout(this, pauseTime, TimeUnit.MILLISECONDS);
}
});
} else {
future.completeExceptionally(new SnapshotCreationException(
"Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:"
+ expectedTimeout + " ms", snapshotDesc));
}
}
}
};
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
});
};
AsyncConnectionImpl.RETRY_TIMER.newTimeout(pollingTask, 1, TimeUnit.MILLISECONDS);
}});
return future;
}

@Override
public CompletableFuture<Void> snapshotTable(SnapshotDescription snapshotDesc) {
SnapshotProtos.SnapshotDescription snapshot =
ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc);
try {
ClientSnapshotDescriptionUtils.assertSnapshotRequestIsValid(snapshot);
} catch (IllegalArgumentException e) {
return failedFuture(e);
}

SnapshotTableRequest snapshotTableRequest = SnapshotTableRequest.newBuilder()
.setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup()).setNonce(ng.newNonce()).build();
return this.<SnapshotTableRequest, SnapshotTableResponse> procedureCall(snapshotTableRequest,
(s, c, req, done) -> s.snapshotTable(c, req, done), (resp) -> resp.getProcId(),
new SnapshotTableProcedureBiConsumer(TableName.valueOf(snapshot.getTable())));
}

@Override
public CompletableFuture<Boolean> isSnapshotFinished(SnapshotDescription snapshot) {
return this
Expand Down Expand Up @@ -2757,8 +2745,8 @@ String getOperationType() {
}
}

private static class SnapshotTableProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotTableProcedureBiConsumer(TableName tableName) {
private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotProcedureBiConsumer(TableName tableName) {
super(tableName);
}

Expand Down
18 changes: 3 additions & 15 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -437,20 +437,13 @@ message IsCleanerChoreEnabledResponse {

message SnapshotRequest {
required SnapshotDescription snapshot = 1;
}

message SnapshotResponse {
required int64 expected_timeout = 1;
}

message SnapshotTableRequest {
required SnapshotDescription snapshot = 1;
optional uint64 nonce_group = 2 [default = 0];
optional uint64 nonce = 3 [default = 0];
}

message SnapshotTableResponse {
optional int64 proc_id = 1;
message SnapshotResponse {
required int64 expected_timeout = 1;
optional int64 proc_id = 2;
}

message GetCompletedSnapshotsRequest {
Expand Down Expand Up @@ -942,11 +935,6 @@ service MasterService {
*/
rpc Snapshot(SnapshotRequest) returns(SnapshotResponse);

/**
* Create a snapshot for the given table.
*/
rpc SnapshotTable(SnapshotTableRequest) returns(SnapshotTableResponse);

/**
* Get completed snapshots.
* Returns a list of snapshot descriptors for completed snapshots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
Expand Down Expand Up @@ -329,8 +330,6 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SnapshotTableResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
Expand Down Expand Up @@ -1708,33 +1707,23 @@ public SnapshotResponse snapshot(RpcController controller,
// get the snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
request.getSnapshot(), server.getConfiguration());
server.snapshotManager.takeSnapshot(snapshot);

// send back the max amount of time the client should wait for the snapshot to complete
long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(server.getConfiguration(),
snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
} catch (ForeignException e) {
throw new ServiceException(e.getCause());
} catch (IOException e) {
throw new ServiceException(e);
}
}

@Override
public MasterProtos.SnapshotTableResponse snapshotTable(RpcController controller,
SnapshotTableRequest request) throws ServiceException {
try {
server.checkInitialized();
server.snapshotManager.checkSnapshotSupport();
LOG.info(server.getClientIdAuditPrefix() + " snapshot request for:" +
ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
SnapshotResponse.Builder builder = SnapshotResponse.newBuilder().setExpectedTimeout(waitTime);

SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
request.getSnapshot(), server.getConfiguration());
long procId = server.snapshotManager
.takeSnapshot(snapshot, request.getNonceGroup(), request.getNonce());
return SnapshotTableResponse.newBuilder().setProcId(procId).build();
// just to pass the unit tests for all 3.x versions,
// the minimum version maybe needs to be modified later
if (VersionInfoUtil.currentClientHasMinimumVersion(2, 10)) {
long nonceGroup = request.getNonceGroup();
long nonce = request.getNonce();
long procId = server.snapshotManager.takeSnapshot(snapshot, nonceGroup, nonce);
return builder.setProcId(procId).build();
} else {
server.snapshotManager.takeSnapshot(snapshot);
return builder.build();
}
} catch (ForeignException e) {
throw new ServiceException(e.getCause());
} catch (IOException e) {
Expand Down
Loading

0 comments on commit b0f826b

Please sign in to comment.