Skip to content

Commit

Permalink
Fix bugs pointed out in PR
Browse files Browse the repository at this point in the history
* correct places where CheckAndPut span names are emitted as "BATCH"
* extend TestAsyncTableTracing to include coverage for both
  `AsyncTable.{CheckAndMutateBuilder,CheckAndMutateWithFilterBuilder}`
* add another method to TableOperationSpanBuilder that accepts `Collection<? extends Row>`
  • Loading branch information
ndimiduk committed Dec 3, 2021
1 parent c267cb8 commit ced47b6
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -459,7 +459,7 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -536,7 +536,7 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -593,7 +593,7 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -668,31 +668,31 @@ public void onComplete() {
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(gets);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(puts);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(deletes);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
Expand Down Expand Up @@ -71,6 +72,11 @@ public TableOperationSpanBuilder setOperation(final Row row) {
return setOperation(valueFrom(row));
}

@SuppressWarnings("unused")
public TableOperationSpanBuilder setOperation(final Collection<? extends Row> operations) {
return setOperation(Operation.BATCH);
}

public TableOperationSpanBuilder setOperation(final Operation operation) {
attributes.put(DB_OPERATION, operation.name());
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
Expand Down Expand Up @@ -144,11 +146,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ClientProtos.MultiResponse resp =
ClientProtos.MultiResponse.newBuilder()
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
.build();
ClientProtos.MultiRequest req = invocation.getArgument(1);
ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
for (ClientProtos.Action ignored : regionAction.getActionList()) {
raBuilder.addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
}
builder.addRegionActionResult(raBuilder);
}
ClientProtos.MultiResponse resp = builder.build();
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
Expand Down Expand Up @@ -346,6 +354,87 @@ public void testCheckAndMutateAll() {
assertTrace("BATCH");
}

private void testCheckAndMutateBuilder(Row op) {
AsyncTable.CheckAndMutateBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
.qualifier(Bytes.toBytes("cq"))
.ifEquals(Bytes.toBytes("v"));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
} else if (op instanceof Delete) {
Delete delete = (Delete) op;
builder.thenDelete(delete).join();
} else if (op instanceof RowMutations) {
RowMutations mutations = (RowMutations) op;
builder.thenMutate(mutations).join();
} else {
fail("unsupported CheckAndPut operation " + op);
}
assertTrace("CHECK_AND_MUTATE");
}

@Test
public void testCheckAndMutateBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
testCheckAndMutateBuilder(put);
}

@Test
public void testCheckAndMutateBuilderThenDelete() {
testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
}

@Test
public void testCheckAndMutateBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
testCheckAndMutateBuilder(mutations);
}

private void testCheckAndMutateWithFilterBuilder(Row op) {
// use of `PrefixFilter` is completely arbitrary here.
AsyncTable.CheckAndMutateWithFilterBuilder builder =
table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
if (op instanceof Put) {
Put put = (Put) op;
builder.thenPut(put).join();
} else if (op instanceof Delete) {
Delete delete = (Delete) op;
builder.thenDelete(delete).join();
} else if (op instanceof RowMutations) {
RowMutations mutations = (RowMutations) op;
builder.thenMutate(mutations).join();
} else {
fail("unsupported CheckAndPut operation " + op);
}
assertTrace("CHECK_AND_MUTATE");
}

@Test
public void testCheckAndMutateWithFilterBuilderThenPut() {
Put put = new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
testCheckAndMutateWithFilterBuilder(put);
}

@Test
public void testCheckAndMutateWithFilterBuilderThenDelete() {
testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
}

@Test
public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
testCheckAndMutateWithFilterBuilder(mutations);
}

@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
Expand Down

0 comments on commit ced47b6

Please sign in to comment.