diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
index 26eb23d1040c..a163c8db3278 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CheckAndMutate.java
@@ -31,8 +31,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
- * Used to perform CheckAndMutate operations. Currently {@link Put}, {@link Delete}
- * and {@link RowMutations} are supported.
+ * Used to perform CheckAndMutate operations.
*
* Use the builder class to instantiate a CheckAndMutate object.
* This builder class is fluent style APIs, the code are like:
@@ -137,9 +136,9 @@ public Builder timeRange(TimeRange timeRange) {
}
private void preCheck(Row action) {
- Preconditions.checkNotNull(action, "action (Put/Delete/RowMutations) is null");
+ Preconditions.checkNotNull(action, "action is null");
if (!Bytes.equals(row, action.getRow())) {
- throw new IllegalArgumentException("The row of the action (Put/Delete/RowMutations) <" +
+ throw new IllegalArgumentException("The row of the action <" +
Bytes.toStringBinary(action.getRow()) + "> doesn't match the original one <" +
Bytes.toStringBinary(this.row) + ">");
}
@@ -174,6 +173,32 @@ public CheckAndMutate build(Delete delete) {
}
}
+ /**
+ * @param increment data to increment if check succeeds
+ * @return a CheckAndMutate object
+ */
+ public CheckAndMutate build(Increment increment) {
+ preCheck(increment);
+ if (filter != null) {
+ return new CheckAndMutate(row, filter, timeRange, increment);
+ } else {
+ return new CheckAndMutate(row, family, qualifier, op, value, timeRange, increment);
+ }
+ }
+
+ /**
+ * @param append data to append if check succeeds
+ * @return a CheckAndMutate object
+ */
+ public CheckAndMutate build(Append append) {
+ preCheck(append);
+ if (filter != null) {
+ return new CheckAndMutate(row, filter, timeRange, append);
+ } else {
+ return new CheckAndMutate(row, family, qualifier, op, value, timeRange, append);
+ }
+ }
+
/**
* @param mutation mutations to perform if check succeeds
* @return a CheckAndMutate object
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a6866d203f0b..7b81f442931b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -678,7 +678,7 @@ protected Long rpcCall() throws Exception {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
+ return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
put).isSuccess();
}
@@ -686,7 +686,7 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, toCompareOperator(compareOp), value, null,
+ return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, put).isSuccess();
}
@@ -696,33 +696,14 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOperator op, final byte [] value, final Put put) throws IOException {
// The name of the operators in CompareOperator are intentionally those of the
// operators in the filter's CompareOp enum.
- return doCheckAndPut(row, family, qualifier, op, value, null, null, put).isSuccess();
- }
-
- private CheckAndMutateResult doCheckAndPut(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
- final TimeRange timeRange, final Put put) throws IOException {
- ClientServiceCallable callable =
- new ClientServiceCallable(this.connection, getName(), row,
- this.rpcControllerFactory.newController(), put.getPriority()) {
- @Override
- protected CheckAndMutateResult rpcCall() throws Exception {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
- filter, timeRange, put);
- MutateResponse response = doMutate(request);
- return new CheckAndMutateResult(response.getProcessed(), null);
- }
- };
- return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs)
- .callWithRetries(callable, this.operationTimeoutMs);
+ return doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess();
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL, value, null,
+ return doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null,
null, delete).isSuccess();
}
@@ -730,7 +711,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, toCompareOperator(compareOp), value, null,
+ return doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, delete).isSuccess();
}
@@ -738,40 +719,7 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, op, value, null, null, delete).isSuccess();
- }
-
- private CheckAndMutateResult doCheckAndDelete(final byte[] row, final byte[] family,
- final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
- final TimeRange timeRange, final Delete delete) throws IOException {
- CancellableRegionServerCallable callable =
- new CancellableRegionServerCallable(this.connection, getName(), row,
- this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
- new RetryingTimeTracker().start(), delete.getPriority()) {
- @Override
- protected SingleResponse rpcCall() throws Exception {
- MutateRequest request = RequestConverter
- .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, op, value, filter, timeRange, delete);
- MutateResponse response = doMutate(request);
- return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
- }
- };
- List rows = Collections.singletonList(delete);
- Object[] results = new Object[1];
- AsyncProcessTask task =
- AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
- .setCallable(callable)
- // TODO any better timeout?
- .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
- .setOperationTimeout(operationTimeoutMs)
- .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
- AsyncRequestFuture ars = multiAp.submit(task);
- ars.waitUntilDone();
- if (ars.hasError()) {
- throw ars.getErrors();
- }
- return new CheckAndMutateResult(((SingleResponse.Entry) results[0]).isProcessed(), null);
+ return doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess();
}
@Override
@@ -856,16 +804,14 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
Row action = checkAndMutate.getAction();
- if (action instanceof Put) {
- Put put = (Put) action;
- validatePut(put);
- return doCheckAndPut(checkAndMutate.getRow(), checkAndMutate.getFamily(),
- checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
- checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), put);
- } else if (action instanceof Delete) {
- return doCheckAndDelete(checkAndMutate.getRow(), checkAndMutate.getFamily(),
+ if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
+ action instanceof Append) {
+ if (action instanceof Put) {
+ validatePut((Put) action);
+ }
+ return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
- checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Delete) action);
+ checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (Mutation) action);
} else {
return doCheckAndMutate(checkAndMutate.getRow(), checkAndMutate.getFamily(),
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
@@ -873,6 +819,29 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
}
}
+ private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
+ final byte[] qualifier, final CompareOperator op, final byte[] value, final Filter filter,
+ final TimeRange timeRange, final Mutation mutation) throws IOException {
+ ClientServiceCallable callable =
+ new ClientServiceCallable(this.connection, getName(), row,
+ this.rpcControllerFactory.newController(), mutation.getPriority()) {
+ @Override
+ protected CheckAndMutateResult rpcCall() throws Exception {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family, qualifier, op, value,
+ filter, timeRange, mutation);
+ MutateResponse response = doMutate(request);
+ if (response.hasResult()) {
+ return new CheckAndMutateResult(response.getProcessed(),
+ ProtobufUtil.toResult(response.getResult(), getRpcControllerCellScanner()));
+ }
+ return new CheckAndMutateResult(response.getProcessed(), null);
+ }
+ };
+ return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs)
+ .callWithRetries(callable, this.operationTimeoutMs);
+ }
+
@Override
public List checkAndMutate(List checkAndMutates)
throws IOException {
@@ -1331,13 +1300,14 @@ private void preCheck() {
public boolean thenPut(Put put) throws IOException {
validatePut(put);
preCheck();
- return doCheckAndPut(row, family, qualifier, op, value, null, timeRange, put).isSuccess();
+ return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
+ .isSuccess();
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
preCheck();
- return doCheckAndDelete(row, family, qualifier, op, value, null, timeRange, delete)
+ return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
.isSuccess();
}
@@ -1369,12 +1339,12 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@Override
public boolean thenPut(Put put) throws IOException {
validatePut(put);
- return doCheckAndPut(row, null, null, null, null, filter, timeRange, put).isSuccess();
+ return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put).isSuccess();
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
- return doCheckAndDelete(row, null, null, null, null, filter, timeRange, delete).isSuccess();
+ return doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess();
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index aa42838b26ac..c3d3246903ab 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -461,8 +461,9 @@ public CompletableFuture checkAndMutate(CheckAndMutate che
if (checkAndMutate.getAction() instanceof Put) {
validatePut((Put) checkAndMutate.getAction(), conn.connConf.getMaxKeyValueSize());
}
- if (checkAndMutate.getAction() instanceof Put ||
- checkAndMutate.getAction() instanceof Delete) {
+ if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete
+ || checkAndMutate.getAction() instanceof Increment
+ || checkAndMutate.getAction() instanceof Append) {
Mutation mutation = (Mutation) checkAndMutate.getAction();
if (mutation instanceof Put) {
validatePut((Put) mutation, conn.connConf.getMaxKeyValueSize());
@@ -475,7 +476,7 @@ public CompletableFuture checkAndMutate(CheckAndMutate che
checkAndMutate.getFamily(), checkAndMutate.getQualifier(),
checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(),
checkAndMutate.getTimeRange(), m),
- (c, r) -> ResponseConverter.getCheckAndMutateResult(r)))
+ (c, r) -> ResponseConverter.getCheckAndMutateResult(r, c.cellScanner())))
.call();
} else if (checkAndMutate.getAction() instanceof RowMutations) {
RowMutations rowMutations = (RowMutations) checkAndMutate.getAction();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 094065cefc88..b9231bc97ad4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -3503,6 +3503,10 @@ public static CheckAndMutate toCheckAndMutate(ClientProtos.Condition condition,
return builder.build(ProtobufUtil.toPut(mutation, cellScanner));
case DELETE:
return builder.build(ProtobufUtil.toDelete(mutation, cellScanner));
+ case INCREMENT:
+ return builder.build(ProtobufUtil.toIncrement(mutation, cellScanner));
+ case APPEND:
+ return builder.build(ProtobufUtil.toAppend(mutation, cellScanner));
default:
throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index c877050b8b79..d64968a68fc6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -239,7 +239,7 @@ public static MutateRequest buildIncrementRequest(
}
/**
- * Create a protocol buffer MutateRequest for a conditioned put/delete
+ * Create a protocol buffer MutateRequest for a conditioned put/delete/increment/append
*
* @return a mutate request
* @throws IOException
@@ -247,15 +247,9 @@ public static MutateRequest buildIncrementRequest(
public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value,
final Filter filter, final TimeRange timeRange, final Mutation mutation) throws IOException {
- MutationType type;
- if (mutation instanceof Put) {
- type = MutationType.PUT;
- } else {
- type = MutationType.DELETE;
- }
return MutateRequest.newBuilder()
.setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
- .setMutation(ProtobufUtil.toMutation(type, mutation))
+ .setMutation(ProtobufUtil.toMutation(getMutationType(mutation), mutation))
.setCondition(buildCondition(row, family, qualifier, op, value, filter, timeRange))
.build();
}
@@ -775,16 +769,12 @@ public static void buildNoDataRegionActions(final byte[] regionName,
} else if (row instanceof Delete) {
buildNoDataRegionAction((Delete) row, cells, builder, actionBuilder, mutationBuilder);
} else if (row instanceof Append) {
- Append a = (Append)row;
- cells.add(a);
- builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
- MutationType.APPEND, a, mutationBuilder, action.getNonce())));
+ buildNoDataRegionAction((Append) row, cells, action.getNonce(), builder, actionBuilder,
+ mutationBuilder);
hasNonce = true;
} else if (row instanceof Increment) {
- Increment i = (Increment)row;
- cells.add(i);
- builder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
- MutationType.INCREMENT, i, mutationBuilder, action.getNonce())));
+ buildNoDataRegionAction((Increment) row, cells, action.getNonce(), builder, actionBuilder,
+ mutationBuilder);
hasNonce = true;
} else if (row instanceof RegionCoprocessorServiceExec) {
RegionCoprocessorServiceExec exec = (RegionCoprocessorServiceExec) row;
@@ -858,6 +848,16 @@ public static void buildNoDataRegionActions(final byte[] regionName,
mutationBuilder.clear();
buildNoDataRegionAction((Delete) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
+ } else if (cam.getAction() instanceof Increment) {
+ actionBuilder.clear();
+ mutationBuilder.clear();
+ buildNoDataRegionAction((Increment) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+ actionBuilder, mutationBuilder);
+ } else if (cam.getAction() instanceof Append) {
+ actionBuilder.clear();
+ mutationBuilder.clear();
+ buildNoDataRegionAction((Append) cam.getAction(), cells, HConstants.NO_NONCE, builder,
+ actionBuilder, mutationBuilder);
} else if (cam.getAction() instanceof RowMutations) {
buildNoDataRegionAction((RowMutations) cam.getAction(), cells, builder, actionBuilder,
mutationBuilder);
@@ -904,6 +904,24 @@ private static void buildNoDataRegionAction(final Delete delete,
}
}
+ private static void buildNoDataRegionAction(final Increment increment,
+ final List cells, long nonce, final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder) throws IOException {
+ cells.add(increment);
+ regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
+ MutationType.INCREMENT, increment, mutationBuilder, nonce)));
+ }
+
+ private static void buildNoDataRegionAction(final Append append,
+ final List cells, long nonce, final RegionAction.Builder regionActionBuilder,
+ final ClientProtos.Action.Builder actionBuilder,
+ final MutationProto.Builder mutationBuilder) throws IOException {
+ cells.add(append);
+ regionActionBuilder.addAction(actionBuilder.setMutation(ProtobufUtil.toMutationNoData(
+ MutationType.APPEND, append, mutationBuilder, nonce)));
+ }
+
private static void buildNoDataRegionAction(final RowMutations rowMutations,
final List cells, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder)
@@ -926,6 +944,19 @@ private static void buildNoDataRegionAction(final RowMutations rowMutations,
}
}
+ private static MutationType getMutationType(Mutation mutation) {
+ assert !(mutation instanceof CheckAndMutate);
+ if (mutation instanceof Put) {
+ return MutationType.PUT;
+ } else if (mutation instanceof Delete) {
+ return MutationType.DELETE;
+ } else if (mutation instanceof Increment) {
+ return MutationType.INCREMENT;
+ } else {
+ return MutationType.APPEND;
+ }
+ }
+
// End utilities for Client
//Start utilities for Admin
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
index ffe39702edd7..87a0df1c2667 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ResponseConverter.java
@@ -225,11 +225,11 @@ public static org.apache.hadoop.hbase.client.MultiResponse getResults(final Mult
* @return a CheckAndMutateResult object
*/
public static CheckAndMutateResult getCheckAndMutateResult(
- ClientProtos.MutateResponse mutateResponse) {
+ ClientProtos.MutateResponse mutateResponse, CellScanner cells) throws IOException {
boolean success = mutateResponse.getProcessed();
Result result = null;
if (mutateResponse.hasResult()) {
- result = ProtobufUtil.toResult(mutateResponse.getResult());
+ result = ProtobufUtil.toResult(mutateResponse.getResult(), cells);
}
return new CheckAndMutateResult(success, result);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index 0bc0631a9cad..ab2c8ce51f81 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -441,8 +441,9 @@ default void postDelete(ObserverContext c, Delete
/**
* This will be called for every batch mutation operation happening at the server. This will be
* called after acquiring the locks on the mutating rows and after applying the proper timestamp
- * for each Mutation at the server. The batch may contain Put/Delete. By setting OperationStatus
- * of Mutations ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
+ * for each Mutation at the server. The batch may contain Put/Delete/Increment/Append. By
+ * setting OperationStatus of Mutations
+ * ({@link MiniBatchOperationInProgress#setOperationStatus(int, OperationStatus)}),
* {@link RegionObserver} can make Region to skip these Mutations.
*
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@@ -454,10 +455,12 @@ default void preBatchMutate(ObserverContext c,
MiniBatchOperationInProgress miniBatchOp) throws IOException {}
/**
- * This will be called after applying a batch of Mutations on a region. The Mutations are added to
- * memstore and WAL. The difference of this one with
- * {@link #postPut(ObserverContext, Put, WALEdit, Durability) }
- * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability) } is
+ * This will be called after applying a batch of Mutations on a region. The Mutations are added
+ * to memstore and WAL. The difference of this one with
+ * {@link #postPut(ObserverContext, Put, WALEdit, Durability)}
+ * and {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)}
+ * and {@link #postIncrement(ObserverContext, Increment, Result)}
+ * and {@link #postAppend(ObserverContext, Append, Result)} is
* this hook will be executed before the mvcc transaction completion.
*
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
@@ -488,8 +491,8 @@ default void postCloseRegionOperation(ObserverContext
* Note: Do not retain references to any Cells in Mutations beyond the life of this invocation.
* If need a Cell reference for later use, copy the cell and use that.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1e1bc6a53dbd..a934e5d07c35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3211,6 +3211,10 @@ private abstract static class BatchOperation {
protected final WALEdit[] walEditsFromCoprocessors;
// reference family cell maps directly so coprocessors can mutate them if desired
protected final Map>[] familyCellMaps;
+ // For Increment/Append operations
+ protected final Result[] results;
+ // For nonce operations
+ protected final boolean[] canProceed;
protected final HRegion region;
protected int nextIndexToProcess = 0;
@@ -3225,6 +3229,8 @@ public BatchOperation(final HRegion region, T[] operations) {
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
this.walEditsFromCoprocessors = new WALEdit[operations.length];
familyCellMaps = new Map[operations.length];
+ this.results = new Result[operations.length];
+ this.canProceed = new boolean[operations.length];
this.region = region;
observedExceptions = new ObservedExceptionsInBatch();
@@ -3279,10 +3285,10 @@ public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Vi
/**
* Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs
- * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on
- * entire batch and will be called from outside of class to check and prepare batch. This can
- * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a
- * 'for' loop over mutations.
+ * CP prePut()/preDelete()/preIncrement()/preAppend() hooks for all mutations in a batch. This
+ * is intended to operate on entire batch and will be called from outside of class to check
+ * and prepare batch. This can be implemented by calling helper method
+ * {@link #checkAndPrepareMutation(int, long)} in a 'for' loop over mutations.
*/
public abstract void checkAndPrepare() throws IOException;
@@ -3350,8 +3356,9 @@ boolean isAtomic() {
/**
* Helper method that checks and prepares only one mutation. This can be used to implement
* {@link #checkAndPrepare()} for entire Batch.
- * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
- * after prePut()/ preDelete() CP hooks are run for the mutation
+ * NOTE: As CP prePut()/preDelete()/preIncrement()/preAppend() hooks may modify mutations,
+ * this method should be called after prePut()/preDelete()/preIncrement()/preAppend() CP hooks
+ * are run for the mutation
*/
protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)
throws IOException {
@@ -3360,8 +3367,10 @@ protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)
// Check the families in the put. If bad, skip this one.
checkAndPreparePut((Put) mutation);
region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);
- } else {
+ } else if (mutation instanceof Delete) {
region.prepareDelete((Delete) mutation);
+ } else if (mutation instanceof Increment || mutation instanceof Append) {
+ region.checkFamilies(mutation.getFamilyCellMap().keySet());
}
}
@@ -3696,7 +3705,9 @@ public void checkAndPreparePut(Put p) throws IOException {
@Override
public void checkAndPrepare() throws IOException {
- final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes
+ // index 0: puts, index 1: deletes, index 2: increments, index 3: append
+ final int[] metrics = {0, 0, 0, 0};
+
visitBatchOperations(true, this.size(), new Visitor() {
private long now = EnvironmentEdgeManager.currentTime();
private WALEdit walEdit;
@@ -3736,21 +3747,57 @@ public boolean visit(int index) throws IOException {
// There were some Deletes in the batch.
region.metricsRegion.updateDelete();
}
+ if (metrics[2] > 0) {
+ // There were some Increment in the batch.
+ region.metricsRegion.updateIncrement();
+ }
+ if (metrics[3] > 0) {
+ // There were some Append in the batch.
+ region.metricsRegion.updateAppend();
+ }
}
}
@Override
public void prepareMiniBatchOperations(MiniBatchOperationInProgress miniBatchOp,
long timestamp, final List acquiredRowLocks) throws IOException {
- byte[] byteTS = Bytes.toBytes(timestamp);
visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
Mutation mutation = getMutation(index);
if (mutation instanceof Put) {
- region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);
+ HRegion.updateCellTimestamps(familyCellMaps[index].values(), Bytes.toBytes(timestamp));
miniBatchOp.incrementNumOfPuts();
- } else {
- region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);
+ } else if (mutation instanceof Delete) {
+ region.prepareDeleteTimestamps(mutation, familyCellMaps[index],
+ Bytes.toBytes(timestamp));
miniBatchOp.incrementNumOfDeletes();
+ } else if (mutation instanceof Increment || mutation instanceof Append) {
+ // For nonce operations
+ canProceed[index] = startNonceOperation(nonceGroup, nonce);
+ if (!canProceed[index]) {
+ // convert duplicate increment/append to get
+ List results = region.get(toGet(mutation), false, nonceGroup, nonce);
+ retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
+ Result.create(results));
+ return true;
+ }
+
+ boolean returnResults;
+ if (mutation instanceof Increment) {
+ returnResults = ((Increment) mutation).isReturnResults();
+ miniBatchOp.incrementNumOfIncrements();
+ } else {
+ returnResults = ((Append) mutation).isReturnResults();
+ miniBatchOp.incrementNumOfAppends();
+ }
+ Result result = doCoprocessorPreCallAfterRowLock(mutation);
+ if (result != null) {
+ retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS,
+ returnResults ? result : Result.EMPTY_RESULT);
+ return true;
+ }
+ List results = returnResults ? new ArrayList<>(mutation.size()) : null;
+ familyCellMaps[index] = reckonDeltas(mutation, results, timestamp);
+ this.results[index] = results != null ? Result.create(results): Result.EMPTY_RESULT;
}
region.rewriteCellTags(familyCellMaps[index], mutation);
@@ -3775,6 +3822,253 @@ public void prepareMiniBatchOperations(MiniBatchOperationInProgress mi
}
}
+ /**
+ * Starts the nonce operation for a mutation, if needed.
+ * @param nonceGroup Nonce group from the request.
+ * @param nonce Nonce.
+ * @return whether to proceed this mutation.
+ */
+ private boolean startNonceOperation(long nonceGroup, long nonce) throws IOException {
+ if (region.rsServices == null || region.rsServices.getNonceManager() == null
+ || nonce == HConstants.NO_NONCE) {
+ return true;
+ }
+ boolean canProceed;
+ try {
+ canProceed = region.rsServices.getNonceManager()
+ .startOperation(nonceGroup, nonce, region.rsServices);
+ } catch (InterruptedException ex) {
+ throw new InterruptedIOException("Nonce start operation interrupted");
+ }
+ return canProceed;
+ }
+
+ /**
+ * Ends nonce operation for a mutation, if needed.
+ * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
+ * @param nonce Nonce.
+ * @param success Whether the operation for this nonce has succeeded.
+ */
+ private void endNonceOperation(long nonceGroup, long nonce, boolean success) {
+ if (region.rsServices != null && region.rsServices.getNonceManager() != null
+ && nonce != HConstants.NO_NONCE) {
+ region.rsServices.getNonceManager().endOperation(nonceGroup, nonce, success);
+ }
+ }
+
+ private static Get toGet(final Mutation mutation) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ Get get = new Get(mutation.getRow());
+ CellScanner cellScanner = mutation.cellScanner();
+ while (!cellScanner.advance()) {
+ Cell cell = cellScanner.current();
+ get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
+ }
+ if (mutation instanceof Increment) {
+ // Increment
+ Increment increment = (Increment) mutation;
+ get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
+ } else {
+ // Append
+ Append append = (Append) mutation;
+ get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
+ }
+ for (Entry entry : mutation.getAttributesMap().entrySet()) {
+ get.setAttribute(entry.getKey(), entry.getValue());
+ }
+ return get;
+ }
+
+ /**
+ * Do coprocessor pre-increment or pre-append after row lock call.
+ * @return Result returned out of the coprocessor, which means bypass all further processing
+ * and return the preferred Result instead, or null which means proceed.
+ */
+ private Result doCoprocessorPreCallAfterRowLock(Mutation mutation) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ Result result = null;
+ if (region.coprocessorHost != null) {
+ if (mutation instanceof Increment) {
+ result = region.coprocessorHost.preIncrementAfterRowLock((Increment) mutation);
+ } else {
+ result = region.coprocessorHost.preAppendAfterRowLock((Append) mutation);
+ }
+ }
+ return result;
+ }
+
+ private Map> reckonDeltas(Mutation mutation, List results,
+ long now) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ Map> ret = new HashMap<>();
+ // Process a Store/family at a time.
+ for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) {
+ final byte[] columnFamilyName = entry.getKey();
+ List deltas = entry.getValue();
+ // Reckon for the Store what to apply to WAL and MemStore.
+ List toApply = reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+ now, deltas, results);
+ if (!toApply.isEmpty()) {
+ for (Cell cell : toApply) {
+ HStore store = region.getStore(cell);
+ if (store == null) {
+ region.checkFamily(CellUtil.cloneFamily(cell));
+ } else {
+ ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+ key -> new ArrayList<>()).add(cell);
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
+ * column family/Store.
+ *
+ * Does Get of current value and then adds passed in deltas for this Store returning the
+ * result.
+ *
+ * @param mutation The encompassing Mutation object
+ * @param deltas Changes to apply to this Store; either increment amount or data to append
+ * @param results In here we accumulate all the Cells we are to return to the client. If null,
+ * client doesn't want results returned.
+ * @return Resulting Cells after deltas have been applied to current
+ * values. Side effect is our filling out of the results List.
+ */
+ private List reckonDeltasByStore(HStore store, Mutation mutation, long now,
+ List deltas, List results) throws IOException {
+ assert mutation instanceof Increment || mutation instanceof Append;
+ byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+ List> cellPairs = new ArrayList<>(deltas.size());
+
+ // Get previous values for all columns in this family.
+ TimeRange tr;
+ if (mutation instanceof Increment) {
+ tr = ((Increment) mutation).getTimeRange();
+ } else {
+ tr = ((Append) mutation).getTimeRange();
+ }
+ List currentValues = get(mutation, store, deltas, tr);
+
+ // Iterate the input columns and update existing values if they were found, otherwise
+ // add new column initialized to the delta amount
+ int currentValuesIndex = 0;
+ for (int i = 0; i < deltas.size(); i++) {
+ Cell delta = deltas.get(i);
+ Cell currentValue = null;
+ if (currentValuesIndex < currentValues.size() &&
+ CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
+ currentValue = currentValues.get(currentValuesIndex);
+ if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
+ currentValuesIndex++;
+ }
+ }
+ // Switch on whether this an increment or an append building the new Cell to apply.
+ Cell newCell;
+ if (mutation instanceof Increment) {
+ long deltaAmount = getLongValue(delta);
+ final long newValue = currentValue == null ?
+ deltaAmount : getLongValue(currentValue) + deltaAmount;
+ newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+ (oldCell) -> Bytes.toBytes(newValue));
+ } else {
+ newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation,
+ (oldCell) ->
+ ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
+ .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
+ .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
+ .array()
+ );
+ }
+ if (region.maxCellSize > 0) {
+ int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
+ if (newCellSize > region.maxCellSize) {
+ String msg = "Cell with size " + newCellSize + " exceeds limit of "
+ + region.maxCellSize + " bytes in region " + this;
+ LOG.debug(msg);
+ throw new DoNotRetryIOException(msg);
+ }
+ }
+ cellPairs.add(new Pair<>(currentValue, newCell));
+ // Add to results to get returned to the Client. If null, cilent does not want results.
+ if (results != null) {
+ results.add(newCell);
+ }
+ }
+ // Give coprocessors a chance to update the new cells before apply to WAL or memstore
+ if (region.coprocessorHost != null) {
+ // Here the operation must be increment or append.
+ cellPairs = mutation instanceof Increment ?
+ region.coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
+ region.coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
+ }
+ return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
+ }
+
+ private static Cell reckonDelta(final Cell delta, final Cell currentCell,
+ final byte[] columnFamily, final long now, Mutation mutation,
+ Function supplier) throws IOException {
+ // Forward any tags found on the delta.
+ List tags = TagUtil.carryForwardTags(delta);
+ tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
+ if (currentCell != null) {
+ tags = TagUtil.carryForwardTags(tags, currentCell);
+ byte[] newValue = supplier.apply(currentCell);
+ return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(mutation.getRow(), 0, mutation.getRow().length)
+ .setFamily(columnFamily, 0, columnFamily.length)
+ // copy the qualifier if the cell is located in shared memory.
+ .setQualifier(CellUtil.cloneQualifier(delta))
+ .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))
+ .setType(KeyValue.Type.Put.getCode())
+ .setValue(newValue, 0, newValue.length)
+ .setTags(TagUtil.fromList(tags))
+ .build();
+ } else {
+ PrivateCellUtil.updateLatestStamp(delta, now);
+ return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);
+ }
+ }
+
+ /**
+ * @return Get the long out of the passed in Cell
+ */
+ private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
+ int len = cell.getValueLength();
+ if (len != Bytes.SIZEOF_LONG) {
+ // throw DoNotRetryIOException instead of IllegalArgumentException
+ throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
+ }
+ return PrivateCellUtil.getValueAsLong(cell);
+ }
+
+ /**
+ * Do a specific Get on passed columnFamily and column qualifiers.
+ * @param mutation Mutation we are doing this Get for.
+ * @param store Which column family on row (TODO: Go all Gets in one go)
+ * @param coordinates Cells from mutation used as coordinates applied to Get.
+ * @return Return list of Cells found.
+ */
+ private List get(Mutation mutation, HStore store, List coordinates,
+ TimeRange tr) throws IOException {
+ // Sort the cells so that they match the order that they appear in the Get results.
+ // Otherwise, we won't be able to find the existing values if the cells are not specified
+ // in order by the client since cells are in an array list.
+ // TODO: I don't get why we are sorting. St.Ack 20150107
+ sort(coordinates, store.getComparator());
+ Get get = new Get(mutation.getRow());
+ for (Cell cell: coordinates) {
+ get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));
+ }
+ // Increments carry time range. If an Increment instance, put it on the Get.
+ if (tr != null) {
+ get.setTimeRange(tr.getMin(), tr.getMax());
+ }
+ return region.get(get, false);
+ }
+
@Override
public List> buildWALEdits(final MiniBatchOperationInProgress
miniBatchOp) throws IOException {
@@ -3807,6 +4101,13 @@ public void completeMiniBatchOperations(
region.coprocessorHost.postBatchMutate(miniBatchOp);
}
super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+
+ if (nonce != HConstants.NO_NONCE) {
+ if (region.rsServices != null && region.rsServices.getNonceManager() != null) {
+ region.rsServices.getNonceManager()
+ .addMvccToOperationContext(nonceGroup, nonce, writeEntry.getWriteNumber());
+ }
+ }
}
@Override
@@ -3818,24 +4119,47 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m
// synced so that the coprocessor contract is adhered to.
if (region.coprocessorHost != null) {
visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
- // only for successful puts
+ // only for successful puts/deletes/increments/appends
if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) {
Mutation m = getMutation(i);
if (m instanceof Put) {
region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
- } else {
+ } else if (m instanceof Delete) {
region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
+ } else if (m instanceof Increment) {
+ Result result = region.getCoprocessorHost().postIncrement((Increment) m,
+ results[i]);
+ if (result != results[i]) {
+ retCodeDetails[i] =
+ new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
+ }
+ } else if (m instanceof Append) {
+ Result result = region.getCoprocessorHost().postAppend((Append) m, results[i]);
+ if (result != results[i]) {
+ retCodeDetails[i] =
+ new OperationStatus(retCodeDetails[i].getOperationStatusCode(), result);
+ }
}
}
return true;
});
}
+ // For nonce operations
+ visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+ if (canProceed[i]) {
+ endNonceOperation(nonceGroup, nonce,
+ retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS);
+ }
+ return true;
+ });
+
// See if the column families were consistent through the whole thing.
// if they were then keep them. If they were not then pass a null.
// null will be treated as unknown.
- // Total time taken might be involving Puts and Deletes.
- // Split the time for puts and deletes based on the total number of Puts and Deletes.
+ // Total time taken might be involving Puts, Deletes, Increments and Appends.
+ // Split the time for puts and deletes based on the total number of Puts, Deletes,
+ // Increments and Appends.
if (region.metricsRegion != null) {
if (miniBatchOp.getNumOfPuts() > 0) {
// There were some Puts in the batch.
@@ -3845,6 +4169,14 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m
// There were some Deletes in the batch.
region.metricsRegion.updateDelete();
}
+ if (miniBatchOp.getNumOfIncrements() > 0) {
+ // There were some Increments in the batch.
+ region.metricsRegion.updateIncrement();
+ }
+ if (miniBatchOp.getNumOfAppends() > 0) {
+ // There were some Appends in the batch.
+ region.metricsRegion.updateAppend();
+ }
}
}
@@ -3856,8 +4188,9 @@ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress m
}
/**
- * Runs prePut/ preDelete coprocessor hook for input mutation in a batch
- * @param metrics Array of 2 ints. index 0: count of puts and index 1: count of deletes
+ * Runs prePut/preDelete/preIncrement/preAppend coprocessor hook for input mutation in a batch
+ * @param metrics Array of 2 ints. index 0: count of puts, index 1: count of deletes, index 2:
+ * count of increments and 3: count of appends
*/
private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] metrics)
throws IOException {
@@ -3883,13 +4216,27 @@ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] m
metrics[1]++;
retCodeDetails[index] = OperationStatus.SUCCESS;
}
+ } else if (m instanceof Increment) {
+ Increment increment = (Increment) m;
+ Result result = region.coprocessorHost.preIncrement(increment);
+ if (result != null) {
+ // pre hook says skip this Increment
+ // mark as success and skip in doMiniBatchMutation
+ metrics[2]++;
+ retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result);
+ }
+ } else if (m instanceof Append) {
+ Append append = (Append) m;
+ Result result = region.coprocessorHost.preAppend(append);
+ if (result != null) {
+ // pre hook says skip this Append
+ // mark as success and skip in doMiniBatchMutation
+ metrics[3]++;
+ retCodeDetails[index] = new OperationStatus(OperationStatusCode.SUCCESS, result);
+ }
} else {
- String msg = "Put/Delete mutations only supported in a batch";
- // In case of passing Append mutations along with the Puts and Deletes in batchMutate
- // mark the operation return code as failure so that it will not be considered in
- // the doMiniBatchMutation
+ String msg = "Put/Delete/Increment/Append mutations only supported in a batch";
retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE, msg);
-
if (isAtomic()) { // fail, atomic means all or none
throw new IOException(msg);
}
@@ -4060,15 +4407,11 @@ public void completeMiniBatchOperations(
}
}
- public OperationStatus[] batchMutate(Mutation[] mutations, long nonceGroup, long nonce)
- throws IOException {
- return batchMutate(mutations, false, nonceGroup, nonce);
- }
-
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
long nonce) throws IOException {
// As it stands, this is used for 3 things
- // * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
+ // * batchMutate with single mutation - put/delete/increment/append, separate or from
+ // checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
return batchMutate(new MutationBatchOperation(this, mutations, atomic, nonceGroup, nonce));
@@ -4076,7 +4419,10 @@ public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long
@Override
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
- return batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ // If the mutations has any Increment/Append operations, we need to do batchMutate atomically
+ boolean atomic = Arrays.stream(mutations)
+ .anyMatch(m -> m instanceof Increment || m instanceof Append);
+ return batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
@@ -4106,12 +4452,12 @@ public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqI
/**
* Perform a batch of mutations.
*
- * It supports only Put and Delete mutations and will ignore other types passed. Operations in
- * a batch are stored with highest durability specified of for all operations in a batch,
- * except for {@link Durability#SKIP_WAL}.
+ * It supports Put, Delete, Increment, Append mutations and will ignore other types passed.
+ * Operations in a batch are stored with highest durability specified of for all operations in a
+ * batch, except for {@link Durability#SKIP_WAL}.
*
*
This function is called from {@link #batchReplay(WALSplitUtil.MutationReplay[], long)} with
- * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[], long, long)} with
+ * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[])} with
* {@link MutationBatchOperation} instance as an argument. As the processing of replay batch
* and mutation batch is very similar, lot of code is shared by providing generic methods in
* base class {@link BatchOperation}. The logic for this method and
@@ -4136,7 +4482,7 @@ OperationStatus[] batchMutate(BatchOperation> batchOp) throws IOException {
if (!initialized) {
this.writeRequestsCount.add(batchOp.size());
// validate and prepare batch for write, for MutationBatchOperation it also calls CP
- // prePut()/ preDelete() hooks
+ // prePut()/preDelete()/preIncrement()/preAppend() hooks
batchOp.checkAndPrepare();
initialized = true;
}
@@ -4154,7 +4500,7 @@ OperationStatus[] batchMutate(BatchOperation> batchOp) throws IOException {
}
/**
- * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
+ * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[])}
* In here we also handle replay of edits on region recover. Also gets change in size brought
* about by applying {@code batchOp}.
*/
@@ -4175,16 +4521,18 @@ private void doMiniBatchMutate(BatchOperation> batchOp) throws IOException {
// We've now grabbed as many mutations off the list as we can
// Ensure we acquire at least one.
if (miniBatchOp.getReadyToWriteCount() <= 0) {
- // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
+ // Nothing to put/delete/increment/append -- an exception in the above such as
+ // NoSuchColumnFamily?
return;
}
lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
locked = true;
- // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
+ // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
// We should record the timestamp only after we have acquired the rowLock,
- // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
+ // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer
+ // timestamp
long now = EnvironmentEdgeManager.currentTime();
batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
@@ -4230,11 +4578,22 @@ private void doMiniBatchMutate(BatchOperation> batchOp) throws IOException {
final int finalLastIndexExclusive =
miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
final boolean finalSuccess = success;
- batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
- batchOp.retCodeDetails[i] =
- finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
- return true;
- });
+ batchOp.visitBatchOperations(true, finalLastIndexExclusive,
+ (int i) -> {
+ Mutation mutation = batchOp.getMutation(i);
+ if (mutation instanceof Increment || mutation instanceof Append) {
+ if (finalSuccess) {
+ batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.SUCCESS,
+ batchOp.results[i]);
+ } else {
+ batchOp.retCodeDetails[i] = OperationStatus.FAILURE;
+ }
+ } else {
+ batchOp.retCodeDetails[i] =
+ finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
+ }
+ return true;
+ });
batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);
@@ -4408,7 +4767,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
}
}
- // If matches put the new put or delete the new delete
+ // If matches, perform the mutation or the rowMutations
if (matches) {
// We have acquired the row lock already. If the system clock is NOT monotonically
// non-decreasing (see HBASE-14070) we should make sure that the mutation has a
@@ -4433,13 +4792,14 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
// timestamp from get (see prepareDeleteTimestamps).
}
// All edits for the given row (across all column families) must happen atomically.
+ Result r = null;
if (mutation != null) {
- doBatchMutate(mutation);
+ r = doBatchMutate(mutation, true).getResult();
} else {
mutateRow(rowMutations);
}
this.checkAndMutateChecksPassed.increment();
- return new CheckAndMutateResult(true, null);
+ return new CheckAndMutateResult(true, r);
}
this.checkAndMutateChecksFailed.increment();
return new CheckAndMutateResult(false, null);
@@ -4453,9 +4813,10 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
private void checkMutationType(final Mutation mutation)
throws DoNotRetryIOException {
- boolean isPut = mutation instanceof Put;
- if (!isPut && !(mutation instanceof Delete)) {
- throw new org.apache.hadoop.hbase.DoNotRetryIOException("Action must be Put or Delete");
+ if (!(mutation instanceof Put) && !(mutation instanceof Delete) &&
+ !(mutation instanceof Increment) && !(mutation instanceof Append)) {
+ throw new org.apache.hadoop.hbase.DoNotRetryIOException(
+ "Action must be Put or Delete or Increment or Delete");
}
}
@@ -4493,17 +4854,27 @@ private boolean matches(final CompareOperator op, final int compareResult) {
return matches;
}
+ private OperationStatus doBatchMutate(Mutation mutation) throws IOException {
+ return doBatchMutate(mutation, false);
+ }
- private void doBatchMutate(Mutation mutation) throws IOException {
- // Currently this is only called for puts and deletes, so no nonces.
- OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation});
+ private OperationStatus doBatchMutate(Mutation mutation, boolean atomic) throws IOException {
+ return doBatchMutate(mutation, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE);
+ }
+
+ private OperationStatus doBatchMutate(Mutation mutation, boolean atomic, long nonceGroup,
+ long nonce) throws IOException {
+ OperationStatus[] batchMutate = this.batchMutate(new Mutation[]{mutation}, atomic,
+ nonceGroup, nonce);
if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.SANITY_CHECK_FAILURE)) {
throw new FailedSanityCheckException(batchMutate[0].getExceptionMsg());
} else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.BAD_FAMILY)) {
throw new NoSuchColumnFamilyException(batchMutate[0].getExceptionMsg());
- } else if (batchMutate[0].getOperationStatusCode().equals(OperationStatusCode.STORE_TOO_BUSY)) {
+ } else if (batchMutate[0].getOperationStatusCode().equals(
+ OperationStatusCode.STORE_TOO_BUSY)) {
throw new RegionTooBusyException(batchMutate[0].getExceptionMsg());
}
+ return batchMutate[0];
}
/**
@@ -8021,8 +8392,16 @@ public Result append(Append append) throws IOException {
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- public Result append(Append mutation, long nonceGroup, long nonce) throws IOException {
- return doDelta(Operation.APPEND, mutation, nonceGroup, nonce, mutation.isReturnResults());
+ public Result append(Append append, long nonceGroup, long nonce) throws IOException {
+ checkReadOnly();
+ checkResources();
+ startRegionOperation(Operation.APPEND);
+ try {
+ // All edits for the given row (across all column families) must happen atomically.
+ return doBatchMutate(append, true, nonceGroup, nonce).getResult();
+ } finally {
+ closeRegionOperation(Operation.APPEND);
+ }
}
@Override
@@ -8030,110 +8409,18 @@ public Result increment(Increment increment) throws IOException {
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- public Result increment(Increment mutation, long nonceGroup, long nonce) throws IOException {
- return doDelta(Operation.INCREMENT, mutation, nonceGroup, nonce, mutation.isReturnResults());
- }
-
- /**
- * Add "deltas" to Cells. Deltas are increments or appends. Switch on op.
- *
- *
If increment, add deltas to current values or if an append, then
- * append the deltas to the current Cell values.
- *
- *
Append and Increment code paths are mostly the same. They differ in just a few places.
- * This method does the code path for increment and append and then in key spots, switches
- * on the passed in op to do increment or append specific paths.
- */
- private Result doDelta(Operation op, Mutation mutation, long nonceGroup, long nonce,
- boolean returnResults) throws IOException {
+ public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException {
checkReadOnly();
checkResources();
- checkRow(mutation.getRow(), op.toString());
- checkFamilies(mutation.getFamilyCellMap().keySet());
- this.writeRequestsCount.increment();
- WriteEntry writeEntry = null;
- startRegionOperation(op);
- List results = returnResults? new ArrayList<>(mutation.size()): null;
- RowLock rowLock = null;
- MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
+ startRegionOperation(Operation.INCREMENT);
try {
- rowLock = getRowLockInternal(mutation.getRow(), false, null);
- lock(this.updatesLock.readLock());
- try {
- Result cpResult = doCoprocessorPreCall(op, mutation);
- if (cpResult != null) {
- // Metrics updated below in the finally block.
- return returnResults? cpResult: null;
- }
- Durability effectiveDurability = getEffectiveDurability(mutation.getDurability());
- Map> forMemStore = new HashMap<>(mutation.getFamilyCellMap().size());
- // Reckon Cells to apply to WAL -- in returned walEdit -- and what to add to memstore and
- // what to return back to the client (in 'forMemStore' and 'results' respectively).
- WALEdit walEdit = reckonDeltas(op, mutation, effectiveDurability, forMemStore, results);
- // Actually write to WAL now if a walEdit to apply.
- if (walEdit != null && !walEdit.isEmpty()) {
- writeEntry = doWALAppend(walEdit, effectiveDurability, nonceGroup, nonce);
- } else {
- // If walEdits is empty, it means we skipped the WAL; update LongAdders and start an mvcc
- // transaction.
- recordMutationWithoutWal(mutation.getFamilyCellMap());
- writeEntry = mvcc.begin();
- updateSequenceId(forMemStore.values(), writeEntry.getWriteNumber());
- }
- // Now write to MemStore. Do it a column family at a time.
- for (Map.Entry> e : forMemStore.entrySet()) {
- applyToMemStore(e.getKey(), e.getValue(), true, memstoreAccounting);
- }
- mvcc.completeAndWait(writeEntry);
- if (rsServices != null && rsServices.getNonceManager() != null) {
- rsServices.getNonceManager().addMvccToOperationContext(nonceGroup, nonce,
- writeEntry.getWriteNumber());
- }
- if (rsServices != null && rsServices.getMetrics() != null) {
- rsServices.getMetrics().updateWriteQueryMeter(this.htableDescriptor.
- getTableName());
- }
- writeEntry = null;
- } finally {
- this.updatesLock.readLock().unlock();
- }
- // If results is null, then client asked that we not return the calculated results.
- return results != null && returnResults? Result.create(results): Result.EMPTY_RESULT;
+ // All edits for the given row (across all column families) must happen atomically.
+ return doBatchMutate(increment, true, nonceGroup, nonce).getResult();
} finally {
- // Call complete always, even on success. doDelta is doing a Get READ_UNCOMMITTED when it goes
- // to get current value under an exclusive lock so no need so no need to wait to return to
- // the client. Means only way to read-your-own-increment or append is to come in with an
- // a 0 increment.
- if (writeEntry != null) mvcc.complete(writeEntry);
- if (rowLock != null) {
- rowLock.release();
- }
- // Request a cache flush if over the limit. Do it outside update lock.
- incMemStoreSize(memstoreAccounting.getMemStoreSize());
- requestFlushIfNeeded();
- closeRegionOperation(op);
- if (this.metricsRegion != null) {
- switch (op) {
- case INCREMENT:
- this.metricsRegion.updateIncrement();
- break;
- case APPEND:
- this.metricsRegion.updateAppend();
- break;
- default:
- break;
- }
- }
+ closeRegionOperation(Operation.INCREMENT);
}
}
- private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, long nonceGroup,
- long nonce)
- throws IOException {
- return doWALAppend(walEdit, durability, WALKey.EMPTY_UUIDS, System.currentTimeMillis(),
- nonceGroup, nonce);
- }
-
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds,
long now, long nonceGroup, long nonce) throws IOException {
return doWALAppend(walEdit, durability, clusterIds, now, nonceGroup, nonce,
@@ -8184,223 +8471,10 @@ private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List> forMemStore, List results) throws IOException {
- WALEdit walEdit = null;
- long now = EnvironmentEdgeManager.currentTime();
- final boolean writeToWAL = effectiveDurability != Durability.SKIP_WAL;
- // Process a Store/family at a time.
- for (Map.Entry> entry: mutation.getFamilyCellMap().entrySet()) {
- final byte[] columnFamilyName = entry.getKey();
- List deltas = entry.getValue();
- // Reckon for the Store what to apply to WAL and MemStore.
- List toApply = reckonDeltasByStore(stores.get(columnFamilyName), op, mutation,
- effectiveDurability, now, deltas, results);
- if (!toApply.isEmpty()) {
- for (Cell cell : toApply) {
- HStore store = getStore(cell);
- if (store == null) {
- checkFamily(CellUtil.cloneFamily(cell));
- } else {
- forMemStore.computeIfAbsent(store, key -> new ArrayList<>()).add(cell);
- }
- }
- if (writeToWAL) {
- if (walEdit == null) {
- walEdit = new WALEdit();
- }
- walEdit.getCells().addAll(toApply);
- }
- }
- }
- return walEdit;
- }
-
- /**
- * Reckon the Cells to apply to WAL, memstore, and to return to the Client in passed
- * column family/Store.
- *
- * Does Get of current value and then adds passed in deltas for this Store returning the result.
- *
- * @param op Whether Increment or Append
- * @param mutation The encompassing Mutation object
- * @param deltas Changes to apply to this Store; either increment amount or data to append
- * @param results In here we accumulate all the Cells we are to return to the client. If null,
- * client doesn't want results returned.
- * @return Resulting Cells after deltas have been applied to current
- * values. Side effect is our filling out of the results List.
- */
- private List reckonDeltasByStore(HStore store, Operation op, Mutation mutation,
- Durability effectiveDurability, long now, List deltas, List results)
- throws IOException {
- byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
- List> cellPairs = new ArrayList<>(deltas.size());
- // Get previous values for all columns in this family.
- TimeRange tr = null;
- switch (op) {
- case INCREMENT:
- tr = ((Increment)mutation).getTimeRange();
- break;
- case APPEND:
- tr = ((Append)mutation).getTimeRange();
- break;
- default:
- break;
- }
- List currentValues = get(mutation, store, deltas,null, tr);
- // Iterate the input columns and update existing values if they were found, otherwise
- // add new column initialized to the delta amount
- int currentValuesIndex = 0;
- for (int i = 0; i < deltas.size(); i++) {
- Cell delta = deltas.get(i);
- Cell currentValue = null;
- if (currentValuesIndex < currentValues.size() &&
- CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), delta)) {
- currentValue = currentValues.get(currentValuesIndex);
- if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, deltas.get(i + 1))) {
- currentValuesIndex++;
- }
- }
- // Switch on whether this an increment or an append building the new Cell to apply.
- Cell newCell = null;
- switch (op) {
- case INCREMENT:
- long deltaAmount = getLongValue(delta);
- final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
- newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
- break;
- case APPEND:
- newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
- ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
- .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
- .put(delta.getValueArray(), delta.getValueOffset(), delta.getValueLength())
- .array()
- );
- break;
- default: throw new UnsupportedOperationException(op.toString());
- }
- if (this.maxCellSize > 0) {
- int newCellSize = PrivateCellUtil.estimatedSerializedSizeOf(newCell);
- if (newCellSize > this.maxCellSize) {
- String msg = "Cell with size " + newCellSize + " exceeds limit of " + this.maxCellSize
- + " bytes in region " + this;
- LOG.debug(msg);
- throw new DoNotRetryIOException(msg);
- }
- }
- cellPairs.add(new Pair<>(currentValue, newCell));
- // Add to results to get returned to the Client. If null, cilent does not want results.
- if (results != null) {
- results.add(newCell);
- }
- }
- // Give coprocessors a chance to update the new cells before apply to WAL or memstore
- if (coprocessorHost != null) {
- // Here the operation must be increment or append.
- cellPairs = op == Operation.INCREMENT ?
- coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
- coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
- }
- return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
- }
-
- private static Cell reckonDelta(final Cell delta, final Cell currentCell,
- final byte[] columnFamily, final long now,
- Mutation mutation, Function supplier) throws IOException {
- // Forward any tags found on the delta.
- List tags = TagUtil.carryForwardTags(delta);
- tags = TagUtil.carryForwardTTLTag(tags, mutation.getTTL());
- if (currentCell != null) {
- tags = TagUtil.carryForwardTags(tags, currentCell);
- byte[] newValue = supplier.apply(currentCell);
- return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
- .setRow(mutation.getRow(), 0, mutation.getRow().length)
- .setFamily(columnFamily, 0, columnFamily.length)
- // copy the qualifier if the cell is located in shared memory.
- .setQualifier(CellUtil.cloneQualifier(delta))
- .setTimestamp(Math.max(currentCell.getTimestamp() + 1, now))
- .setType(KeyValue.Type.Put.getCode())
- .setValue(newValue, 0, newValue.length)
- .setTags(TagUtil.fromList(tags))
- .build();
- } else {
- PrivateCellUtil.updateLatestStamp(delta, now);
- return CollectionUtils.isEmpty(tags) ? delta : PrivateCellUtil.createCell(delta, tags);
- }
- }
- /**
- * @return Get the long out of the passed in Cell
- */
- private static long getLongValue(final Cell cell) throws DoNotRetryIOException {
- int len = cell.getValueLength();
- if (len != Bytes.SIZEOF_LONG) {
- // throw DoNotRetryIOException instead of IllegalArgumentException
- throw new DoNotRetryIOException("Field is not a long, it's " + len + " bytes wide");
- }
- return PrivateCellUtil.getValueAsLong(cell);
- }
- /**
- * Do a specific Get on passed columnFamily and column qualifiers.
- * @param mutation Mutation we are doing this Get for.
- * @param store Which column family on row (TODO: Go all Gets in one go)
- * @param coordinates Cells from mutation used as coordinates applied to Get.
- * @return Return list of Cells found.
- */
- private List get(Mutation mutation, HStore store, List coordinates,
- IsolationLevel isolation, TimeRange tr) throws IOException {
- // Sort the cells so that they match the order that they appear in the Get results. Otherwise,
- // we won't be able to find the existing values if the cells are not specified in order by the
- // client since cells are in an array list.
- // TODO: I don't get why we are sorting. St.Ack 20150107
- sort(coordinates, store.getComparator());
- Get get = new Get(mutation.getRow());
- if (isolation != null) {
- get.setIsolationLevel(isolation);
- }
- for (Cell cell: coordinates) {
- get.addColumn(store.getColumnFamilyDescriptor().getName(), CellUtil.cloneQualifier(cell));
- }
- // Increments carry time range. If an Increment instance, put it on the Get.
- if (tr != null) {
- get.setTimeRange(tr.getMin(), tr.getMax());
- }
- return get(get, false);
- }
/**
* @return Sorted list of cells using comparator
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 65d2f5536f31..ae5b6ec3c8ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -45,6 +45,8 @@ public class MiniBatchOperationInProgress {
private int cellCount = 0;
private int numOfPuts = 0;
private int numOfDeletes = 0;
+ private int numOfIncrements = 0;
+ private int numOfAppends = 0;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
@@ -169,4 +171,20 @@ public int getNumOfDeletes() {
public void incrementNumOfDeletes() {
this.numOfDeletes += 1;
}
+
+ public int getNumOfIncrements() {
+ return numOfIncrements;
+ }
+
+ public void incrementNumOfIncrements() {
+ this.numOfIncrements += 1;
+ }
+
+ public int getNumOfAppends() {
+ return numOfAppends;
+ }
+
+ public void incrementNumOfAppends() {
+ this.numOfAppends += 1;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
index 21027d399485..6beb7c78e2ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationStatus.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -43,21 +44,29 @@ public class OperationStatus {
public static final OperationStatus NOT_RUN = new OperationStatus(OperationStatusCode.NOT_RUN);
private final OperationStatusCode code;
-
+ private final Result result;
private final String exceptionMsg;
public OperationStatus(OperationStatusCode code) {
- this(code, "");
+ this(code, null, "");
+ }
+
+ public OperationStatus(OperationStatusCode code, Result result) {
+ this(code, result, "");
}
public OperationStatus(OperationStatusCode code, String exceptionMsg) {
- this.code = code;
- this.exceptionMsg = exceptionMsg;
+ this(code, null, exceptionMsg);
}
public OperationStatus(OperationStatusCode code, Exception e) {
+ this(code, null, (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage());
+ }
+
+ private OperationStatus(OperationStatusCode code, Result result, String exceptionMsg) {
this.code = code;
- this.exceptionMsg = (e == null) ? "" : e.getClass().getName() + ": " + e.getMessage();
+ this.result = result;
+ this.exceptionMsg = exceptionMsg;
}
/**
@@ -67,6 +76,13 @@ public OperationStatusCode getOperationStatusCode() {
return code;
}
+ /**
+ * @return result
+ */
+ public Result getResult() {
+ return result;
+ }
+
/**
* @return ExceptionMessge
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 7fe7f194ad00..eaa8ca03952e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -19,7 +19,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
@@ -549,38 +548,6 @@ private void rpcPreCheck(String requestName) throws ServiceException {
}
}
- /**
- * Starts the nonce operation for a mutation, if needed.
- * @param mutation Mutation.
- * @param nonceGroup Nonce group from the request.
- * @return whether to proceed this mutation.
- */
- private boolean startNonceOperation(final MutationProto mutation, long nonceGroup)
- throws IOException {
- if (regionServer.nonceManager == null || !mutation.hasNonce()) return true;
- boolean canProceed = false;
- try {
- canProceed = regionServer.nonceManager.startOperation(
- nonceGroup, mutation.getNonce(), regionServer);
- } catch (InterruptedException ex) {
- throw new InterruptedIOException("Nonce start operation interrupted");
- }
- return canProceed;
- }
-
- /**
- * Ends nonce operation for a mutation, if needed.
- * @param mutation Mutation.
- * @param nonceGroup Nonce group from the request. Always 0 in initial implementation.
- * @param success Whether the operation for this nonce has succeeded.
- */
- private void endNonceOperation(final MutationProto mutation,
- long nonceGroup, boolean success) {
- if (regionServer.nonceManager != null && mutation.hasNonce()) {
- regionServer.nonceManager.endOperation(nonceGroup, mutation.getNonce(), success);
- }
- }
-
private boolean isClientCellBlockSupport(RpcCallContext context) {
return context != null && context.isClientCellBlockSupported();
}
@@ -618,7 +585,7 @@ private void addResults(ScanResponse.Builder builder, List results,
}
private CheckAndMutateResult checkAndMutate(HRegion region, List actions,
- CellScanner cellScanner, Condition condition,ActivePolicyEnforcement spaceQuotaEnforcement)
+ CellScanner cellScanner, Condition condition, ActivePolicyEnforcement spaceQuotaEnforcement)
throws IOException {
int countOfCompleteMutation = 0;
try {
@@ -691,35 +658,10 @@ private Result append(final HRegion region, final OperationQuota quota,
checkCellSizeLimit(region, append);
spaceQuota.getPolicyEnforcement(region).check(append);
quota.addMutation(append);
- Result r = null;
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().preAppend(append);
- }
- if (r == null) {
- boolean canProceed = startNonceOperation(mutation, nonceGroup);
- boolean success = false;
- try {
- long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
- if (canProceed) {
- r = region.append(append, nonceGroup, nonce);
- } else {
- // convert duplicate append to get
- List results = region.get(toGet(append), false, nonceGroup, nonce);
- r = Result.create(results);
- }
- success = true;
- } finally {
- if (canProceed) {
- endNonceOperation(mutation, nonceGroup, success);
- }
- }
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().postAppend(append, r);
- }
- }
+ long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
+ Result r = region.append(append, nonceGroup, nonce);
if (regionServer.getMetrics() != null) {
- regionServer.getMetrics().updateAppend(
- region.getTableDescriptor().getTableName(),
+ regionServer.getMetrics().updateAppend(region.getTableDescriptor().getTableName(),
EnvironmentEdgeManager.currentTime() - before);
}
return r == null ? Result.EMPTY_RESULT : r;
@@ -737,66 +679,16 @@ private Result increment(final HRegion region, final OperationQuota quota,
checkCellSizeLimit(region, increment);
spaceQuota.getPolicyEnforcement(region).check(increment);
quota.addMutation(increment);
- Result r = null;
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().preIncrement(increment);
- }
- if (r == null) {
- boolean canProceed = startNonceOperation(mutation, nonceGroup);
- boolean success = false;
- try {
- long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
- if (canProceed) {
- r = region.increment(increment, nonceGroup, nonce);
- } else {
- // convert duplicate increment to get
- List results = region.get(toGet(increment), false, nonceGroup, nonce);
- r = Result.create(results);
- }
- success = true;
- } finally {
- if (canProceed) {
- endNonceOperation(mutation, nonceGroup, success);
- }
- }
- if (region.getCoprocessorHost() != null) {
- r = region.getCoprocessorHost().postIncrement(increment, r);
- }
- }
+ long nonce = mutation.hasNonce() ? mutation.getNonce() : HConstants.NO_NONCE;
+ Result r = region.increment(increment, nonceGroup, nonce);
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
if (metricsRegionServer != null) {
- metricsRegionServer.updateIncrement(
- region.getTableDescriptor().getTableName(),
- EnvironmentEdgeManager.currentTime() - before);
+ metricsRegionServer.updateIncrement(region.getTableDescriptor().getTableName(),
+ EnvironmentEdgeManager.currentTime() - before);
}
return r == null ? Result.EMPTY_RESULT : r;
}
- private static Get toGet(final Mutation mutation) throws IOException {
- if(!(mutation instanceof Increment) && !(mutation instanceof Append)) {
- throw new AssertionError("mutation must be a instance of Increment or Append");
- }
- Get get = new Get(mutation.getRow());
- CellScanner cellScanner = mutation.cellScanner();
- while (!cellScanner.advance()) {
- Cell cell = cellScanner.current();
- get.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell));
- }
- if (mutation instanceof Increment) {
- // Increment
- Increment increment = (Increment) mutation;
- get.setTimeRange(increment.getTimeRange().getMin(), increment.getTimeRange().getMax());
- } else {
- // Append
- Append append = (Append) mutation;
- get.setTimeRange(append.getTimeRange().getMin(), append.getTimeRange().getMax());
- }
- for (Entry entry : mutation.getAttributesMap().entrySet()) {
- get.setAttribute(entry.getKey(), entry.getValue());
- }
- return get;
- }
-
/**
* Run through the regionMutation rm and per Mutation, do the work, and then when
* done, add an instance of a {@link ResultOrException} that corresponds to each Mutation.
@@ -2847,23 +2739,35 @@ public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
try {
if (regionAction.hasCondition()) {
try {
- CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
- cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
- regionActionResultBuilder.setProcessed(result.isSuccess());
ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
ClientProtos.ResultOrException.newBuilder();
- for (int i = 0; i < regionAction.getActionCount(); i++) {
- if (i == 0 && result.getResult() != null) {
- resultOrExceptionOrBuilder.setIndex(i);
- regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
- .setResult(ProtobufUtil.toResult(result.getResult())).build());
- continue;
+ if (regionAction.getActionCount() == 1) {
+ CheckAndMutateResult result = checkAndMutate(region, quota,
+ regionAction.getAction(0).getMutation(), cellScanner,
+ regionAction.getCondition(), spaceQuotaEnforcement);
+ regionActionResultBuilder.setProcessed(result.isSuccess());
+ resultOrExceptionOrBuilder.setIndex(0);
+ if (result.getResult() != null) {
+ resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
}
- // To unify the response format with doNonAtomicRegionMutation and read through
- // client's AsyncProcess we have to add an empty result instance per operation
- resultOrExceptionOrBuilder.clear();
- resultOrExceptionOrBuilder.setIndex(i);
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+ } else {
+ CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
+ cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
+ regionActionResultBuilder.setProcessed(result.isSuccess());
+ for (int i = 0; i < regionAction.getActionCount(); i++) {
+ if (i == 0 && result.getResult() != null) {
+ resultOrExceptionOrBuilder.setIndex(i);
+ regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
+ .setResult(ProtobufUtil.toResult(result.getResult())).build());
+ continue;
+ }
+ // To unify the response format with doNonAtomicRegionMutation and read through
+ // client's AsyncProcess we have to add an empty result instance per operation
+ resultOrExceptionOrBuilder.clear();
+ resultOrExceptionOrBuilder.setIndex(i);
+ regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
+ }
}
} catch (IOException e) {
rpcServer.getMetrics().exception(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index d03d19f173b4..64e2f6615d86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -289,7 +289,8 @@ public interface RowLock {
/**
* Perform a batch of mutations.
*
- * Note this supports only Put and Delete mutations and will ignore other types passed.
+ * Note this supports only Put, Delete, Increment and Append mutations and will ignore other
+ * types passed.
* @param mutations the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 39e79c12f58f..6cac67a4f733 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -1506,8 +1506,11 @@ public void preBatchMutate(ObserverContext c,
if (m instanceof Put) {
checkForReservedTagPresence(user, m);
opType = OpType.PUT;
- } else {
+ } else if (m instanceof Delete) {
opType = OpType.DELETE;
+ } else {
+ // If the operation type is not Put or Delete, do nothing
+ continue;
}
AuthResult authResult = null;
if (checkCoveringPermission(user, opType, c.getEnvironment(), m.getRow(),
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index de81750574f9..2944c40beeec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -636,6 +636,7 @@ public void testCheckAndPut() throws InterruptedException, ExecutionException {
successCount.incrementAndGet();
successIndex.set(i);
}
+ assertNull(x.getResult());
latch.countDown();
}));
latch.await();
@@ -670,6 +671,7 @@ public void testCheckAndDelete() throws InterruptedException, ExecutionException
successCount.incrementAndGet();
successIndex.set(i);
}
+ assertNull(x.getResult());
deleteLatch.countDown();
}));
deleteLatch.await();
@@ -717,6 +719,7 @@ public void testCheckAndMutate() throws InterruptedException, ExecutionException
successCount.incrementAndGet();
successIndex.set(i);
}
+ assertNull(x.getResult());
mutateLatch.countDown();
});
});
@@ -743,18 +746,21 @@ public void testCheckAndMutateWithTimeRange() throws Exception {
.ifNotExists(FAMILY, QUALIFIER)
.build(put)).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(put)).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(put)).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
RowMutations rm = new RowMutations(row).add((Mutation) put);
@@ -763,12 +769,14 @@ public void testCheckAndMutateWithTimeRange() throws Exception {
.timeRange(TimeRange.at(ts + 10000))
.build(rm)).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(rm)).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
@@ -777,12 +785,14 @@ public void testCheckAndMutateWithTimeRange() throws Exception {
.timeRange(TimeRange.at(ts + 10000))
.build(delete)).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(delete)).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
}
@Test
@@ -802,6 +812,7 @@ public void testCheckAndMutateWithSingleFilter() throws Throwable {
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -812,6 +823,7 @@ public void testCheckAndMutateWithSingleFilter() throws Throwable {
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
@@ -821,6 +833,7 @@ public void testCheckAndMutateWithSingleFilter() throws Throwable {
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
@@ -833,6 +846,7 @@ public void testCheckAndMutateWithSingleFilter() throws Throwable {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -860,6 +874,7 @@ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -873,6 +888,7 @@ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
@@ -885,6 +901,7 @@ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
@@ -900,6 +917,7 @@ public void testCheckAndMutateWithMultipleFilters() throws Throwable {
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
@@ -922,6 +940,7 @@ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -934,6 +953,7 @@ public void testCheckAndMutateWithTimestampFilter() throws Throwable {
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@@ -953,6 +973,7 @@ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
+ assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
@@ -965,10 +986,125 @@ public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
.get();
assertFalse(result.isSuccess());
+ assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
+ @Test
+ public void testCheckAndIncrement() throws Throwable {
+ AsyncTable> table = getTable.get();
+
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+ // CheckAndIncrement with correct value
+ CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
+ assertTrue(res.isSuccess());
+ assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // CheckAndIncrement with wrong value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
+ assertFalse(res.isSuccess());
+ assertNull(res.getResult());
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+ // CheckAndIncrement with a filter and correct value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
+ assertTrue(res.isSuccess());
+ assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // CheckAndIncrement with a filter and correct value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("b")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+ Bytes.toBytes("d"))))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
+ assertFalse(res.isSuccess());
+ assertNull(res.getResult());
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+ }
+
+ @Test
+ public void testCheckAndAppend() throws Throwable {
+ AsyncTable> table = getTable.get();
+
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
+
+ // CheckAndAppend with correct value
+ CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+ assertTrue(res.isSuccess());
+ assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // CheckAndAppend with correct value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
+ .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
+ assertFalse(res.isSuccess());
+ assertNull(res.getResult());
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
+
+ // CheckAndAppend with a filter and correct value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("a")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+ Bytes.toBytes("c"))))
+ .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
+ assertTrue(res.isSuccess());
+ assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ // CheckAndAppend with a filter and wrong value
+ res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
+ .ifMatches(new FilterList(
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
+ Bytes.toBytes("b")),
+ new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
+ Bytes.toBytes("d"))))
+ .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
+ assertFalse(res.isSuccess());
+ assertNull(res.getResult());
+
+ result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+ }
+
// Tests for batch version of checkAndMutate
@Test
@@ -997,7 +1133,9 @@ public void testCheckAndMutateBatch() throws Throwable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1017,7 +1155,9 @@ public void testCheckAndMutateBatch() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
@@ -1042,7 +1182,9 @@ public void testCheckAndMutateBatch() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
result = table.get(new Get(row3)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
@@ -1079,7 +1221,9 @@ public void testCheckAndMutateBatch2() throws Throwable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1099,7 +1243,9 @@ public void testCheckAndMutateBatch2() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
@@ -1121,7 +1267,9 @@ public void testCheckAndMutateBatch2() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1166,7 +1314,9 @@ public void testCheckAndMutateBatchWithFilter() throws Throwable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1194,7 +1344,9 @@ public void testCheckAndMutateBatchWithFilter() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
@@ -1227,7 +1379,9 @@ public void testCheckAndMutateBatchWithFilter() throws Throwable {
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
@@ -1273,7 +1427,9 @@ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
+ assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
@@ -1282,6 +1438,80 @@ public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
+ @Test
+ public void testCheckAndIncrementBatch() throws Throwable {
+ AsyncTable> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+ // CheckAndIncrement with correct value
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+ // CheckAndIncrement with wrong value
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+ .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+ List results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0).isSuccess());
+ assertEquals(1, Bytes.toLong(results.get(0).getResult()
+ .getValue(FAMILY, Bytes.toBytes("B"))));
+ assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+
+ @Test
+ public void testCheckAndAppendBatch() throws Throwable {
+ AsyncTable> table = getTable.get();
+ byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+ table.putAll(Arrays.asList(
+ new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
+ new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+ .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
+
+ // CheckAndAppend with correct value
+ CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+ .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+ .build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
+
+ // CheckAndAppend with wrong value
+ CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+ .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+ .build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
+
+ List results =
+ table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
+
+ assertTrue(results.get(0).isSuccess());
+ assertEquals("bb", Bytes.toString(results.get(0).getResult()
+ .getValue(FAMILY, Bytes.toBytes("B"))));
+ assertFalse(results.get(1).isSuccess());
+ assertNull(results.get(1).getResult());
+
+ Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
+ assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
+
+ result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
+ assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
+ }
+
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
index cb53f307b7ad..20ec40e7bb0e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableBatch.java
@@ -344,13 +344,17 @@ public void testWithCheckAndMutate() throws Exception {
byte[] row3 = Bytes.toBytes("row3");
byte[] row4 = Bytes.toBytes("row4");
byte[] row5 = Bytes.toBytes("row5");
+ byte[] row6 = Bytes.toBytes("row6");
+ byte[] row7 = Bytes.toBytes("row7");
table.putAll(Arrays.asList(
new Put(row1).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
- new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
+ new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
+ new Put(row6).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L)),
+ new Put(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")))).get();
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row1)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
@@ -363,17 +367,36 @@ public void testWithCheckAndMutate() throws Exception {
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("a"))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("h")));
Put put = new Put(row5).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("f"));
-
- List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put);
+ CheckAndMutate checkAndMutate3 = CheckAndMutate.newBuilder(row6)
+ .ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes(10L))
+ .build(new Increment(row6).addColumn(FAMILY, Bytes.toBytes("F"), 1));
+ CheckAndMutate checkAndMutate4 = CheckAndMutate.newBuilder(row7)
+ .ifEquals(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g"))
+ .build(new Append(row7).addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes("g")));
+
+ List actions = Arrays.asList(checkAndMutate1, get, mutations, checkAndMutate2, put,
+ checkAndMutate3, checkAndMutate4);
List