Skip to content

Commit

Permalink
HBASE-24602 Add Increment and Append support to CheckAndMutate (#2363)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
brfrn169 authored Sep 8, 2020
1 parent 261405e commit daccdb1
Show file tree
Hide file tree
Showing 19 changed files with 1,299 additions and 621 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;

/**
* Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
* and {@link RowMutations} are supported.
* Used to perform CheckAndMutate operations.
* <p>
* Use the builder class to instantiate a CheckAndMutate object.
* This builder class is fluent style APIs, the code are like:
Expand Down Expand Up @@ -137,9 +136,9 @@ public Builder timeRange(TimeRange timeRange) {
}

private void preCheck(Row action) {
Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
Preconditions.checkNotNull(action, "action is null");
if (!Bytes.equals(row, action.getRow())) {
throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
throw new IllegalArgumentException("The row of the action <" +
Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
Expand Down Expand Up @@ -174,6 +173,32 @@ public CheckAndMutate build(Delete delete) {
}
}

/**
* @param increment data to increment if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Increment increment) {
preCheck(increment);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, increment);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment);
}
}

/**
* @param append data to append if check succeeds
* @return a CheckAndMutate object
*/
public CheckAndMutate build(Append append) {
preCheck(append);
if (filter != null) {
return new CheckAndMutate(row, filter, timeRange, append);
} else {
return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append);
}
}

/**
* @param mutation mutations to perform if check succeeds
* @return a CheckAndMutate object
Expand Down
112 changes: 41 additions & 71 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -678,15 +678,15 @@ protected Long rpcCall() throws Exception {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
put).isSuccess();
}

@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, put).isSuccess();
}

Expand All @@ -696,82 +696,30 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOperator op, final byte [] value, final Put put) throws IOException {
// The name of the operators in CompareOperator are intentionally those of the
// operators in the filter's CompareOp enum.
return doCheckAndPut(row, family, qualifier, op, value, null, null, put).isSuccess();
}

private CheckAndMutateResult doCheckAndPut(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Put put) throws IOException {
ClientServiceCallable<CheckAndMutateResult> callable =
new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), put.getPriority()) {
@Override
protected CheckAndMutateResult rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
filter, timeRange, put);
MutateResponse response = doMutate(request);
return new CheckAndMutateResult(response.getProcessed(), null);
}
};
return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
.callWithRetries(callable, this.operationTimeoutMs);
return doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess();
}

@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null,
null, delete).isSuccess();
}

@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, delete).isSuccess();
}

@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete).isSuccess();
}

private CheckAndMutateResult doCheckAndDelete(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Delete delete) throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
new RetryingTimeTracker().start(), delete.getPriority()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter
.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
qualifier, op, value, filter, timeRange, delete);
MutateResponse response = doMutate(request);
return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
}
};
List<Delete> rows = Collections.singletonList(delete);
Object[] results = new Object[1];
AsyncProcessTask task =
AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
.setCallable(callable)
// TODO any better timeout?
.setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
return new CheckAndMutateResult(((SingleResponse.Entry) results[0]).isProcessed(), null);
return doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess();
}

@Override
Expand Down Expand Up @@ -856,23 +804,44 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
Row action = checkAndMutate.getAction();
if (action instanceof Put) {
Put put = (Put) action;
validatePut(put);
return doCheckAndPut(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), put);
} else if (action instanceof Delete) {
return doCheckAndDelete(checkAndMutate.getRow(), checkAndMutate.getFamily(),
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
action instanceof Append) {
if (action instanceof Put) {
validatePut((Put) action);
}
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Delete) action);
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
} else {
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
}
}

private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
final TimeRange timeRange, final Mutation mutation) throws IOException {
ClientServiceCallable<CheckAndMutateResult> callable =
new ClientServiceCallable<CheckAndMutateResult>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), mutation.getPriority()) {
@Override
protected CheckAndMutateResult rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
filter, timeRange, mutation);
MutateResponse response = doMutate(request);
if (response.hasResult()) {
return new CheckAndMutateResult(response.getProcessed(),
ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
}
return new CheckAndMutateResult(response.getProcessed(), null);
}
};
return rpcCallerFactory.<CheckAndMutateResult> newCaller(this.writeRpcTimeoutMs)
.callWithRetries(callable, this.operationTimeoutMs);
}

@Override
public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
throws IOException {
Expand Down Expand Up @@ -1331,13 +1300,14 @@ private void preCheck() {
public boolean thenPut(Put put) throws IOException {
validatePut(put);
preCheck();
return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put).isSuccess();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
.isSuccess();
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
preCheck();
return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete)
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
.isSuccess();
}

Expand Down Expand Up @@ -1369,12 +1339,12 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@Override
public boolean thenPut(Put put) throws IOException {
validatePut(put);
return doCheckAndPut(row, null, null, null, null, filter, timeRange, put).isSuccess();
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete).isSuccess();
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,9 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete) {
if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
|| checkAndMutate.getAction() instanceof Increment
|| checkAndMutate.getAction() instanceof Append) {
Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
Expand All @@ -475,7 +476,7 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
(c, r) -> ResponseConverter.getCheckAndMutateResult(r)))
(c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3503,6 +3503,10 @@ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
case DELETE:
return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
case INCREMENT:
return builder.build(ProtobufUtil.toIncrement(mutation, cellScanner));
case APPEND:
return builder.build(ProtobufUtil.toAppend(mutation, cellScanner));
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,17 @@ public static MutateRequest buildIncrementRequest(
}

/**
* Create a protocol buffer MutateRequest for a conditioned put/delete
* Create a protocol buffer MutateRequest for a conditioned put/delete/increment/append
*
* @return a mutate request
* @throws IOException
*/
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
MutationType type;
if (mutation instanceof Put) {
type = MutationType.PUT;
} else {
type = MutationType.DELETE;
}
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
.setMutation(ProtobufUtil.toMutation(type, mutation))
.setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
Expand Down Expand Up @@ -775,16 +769,12 @@ public static void buildNoDataRegionActions(final byte[] regionName,
} else if (row instanceof Delete) {
buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Append) {
Append a = (Append)row;
cells.add(a);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, a, mutationBuilder, action.getNonce())));
buildNoDataRegionAction((Append) row, cells, action.getNonce(), builder, actionBuilder,
mutationBuilder);
hasNonce = true;
} else if (row instanceof Increment) {
Increment i = (Increment)row;
cells.add(i);
builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
buildNoDataRegionAction((Increment) row, cells, action.getNonce(), builder, actionBuilder,
mutationBuilder);
hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
Expand Down Expand Up @@ -858,6 +848,16 @@ public static void buildNoDataRegionActions(final byte[] regionName,
mutationBuilder.clear();
buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
} else if (cam.getAction() instanceof Increment) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
actionBuilder, mutationBuilder);
} else if (cam.getAction() instanceof Append) {
actionBuilder.clear();
mutationBuilder.clear();
buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
actionBuilder, mutationBuilder);
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
Expand Down Expand Up @@ -904,6 +904,24 @@ private static void buildNoDataRegionAction(final Delete delete,
}
}

private static void buildNoDataRegionAction(final Increment increment,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
cells.add(increment);
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.INCREMENT, increment, mutationBuilder, nonce)));
}

private static void buildNoDataRegionAction(final Append append,
final List<CellScannable> cells, long nonce, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
cells.add(append);
regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
MutationType.APPEND, append, mutationBuilder, nonce)));
}

private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List<CellScannable> cells, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
Expand All @@ -926,6 +944,19 @@ private static void buildNoDataRegionAction(final RowMutations rowMutations,
}
}

private static MutationType getMutationType(Mutation mutation) {
assert !(mutation instanceof CheckAndMutate);
if (mutation instanceof Put) {
return MutationType.PUT;
} else if (mutation instanceof Delete) {
return MutationType.DELETE;
} else if (mutation instanceof Increment) {
return MutationType.INCREMENT;
} else {
return MutationType.APPEND;
}
}

// End utilities for Client
//Start utilities for Admin

Expand Down
Loading

0 comments on commit daccdb1

Please sign in to comment.