diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 6a5c486b7478..41e2509c1b52 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -350,7 +350,8 @@ public CompletableFuture thenPut(Put put) { preCheck(); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(put); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, @@ -366,7 +367,8 @@ public CompletableFuture thenDelete(Delete delete) { preCheck(); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(delete); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -383,7 +385,8 @@ public CompletableFuture thenMutate(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(mutations); return tracedFuture( () -> RawAsyncTableImpl.this . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) @@ -427,7 +430,8 @@ public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(put); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, @@ -443,7 +447,8 @@ public CompletableFuture thenPut(Put put) { public CompletableFuture thenDelete(Delete delete) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE) + .setContainerOperations(delete); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -459,7 +464,8 @@ public CompletableFuture thenMutate(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(mutations); return tracedFuture( () -> RawAsyncTableImpl.this . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) @@ -482,7 +488,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(checkAndMutate); + .setOperation(checkAndMutate) + .setContainerOperations(checkAndMutate.getAction()); return tracedFuture(() -> { if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || @@ -536,7 +543,8 @@ public CompletableFuture checkAndMutate(CheckAndMutate che checkAndMutate(List checkAndMutates) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(checkAndMutates); return tracedFutures( () -> batch(checkAndMutates, rpcTimeoutNs).stream() .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), @@ -593,7 +601,8 @@ public CompletableFuture mutateRow(RowMutations mutations) { long nonce = conn.getNonceGenerator().newNonce(); final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(mutations); return tracedFuture( () -> this . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) @@ -668,7 +677,8 @@ public void onComplete() { public List> get(List gets) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(HBaseSemanticAttributes.Operation.GET); return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); } @@ -676,7 +686,8 @@ public List> get(List gets) { public List> put(List puts) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(HBaseSemanticAttributes.Operation.PUT); return tracedFutures(() -> voidMutate(puts), supplier); } @@ -684,7 +695,8 @@ public List> put(List puts) { public List> delete(List deletes) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(HBaseSemanticAttributes.Operation.DELETE); return tracedFutures(() -> voidMutate(deletes), supplier); } @@ -692,7 +704,8 @@ public List> delete(List deletes) { public List> batch(List actions) { final Supplier supplier = new TableOperationSpanBuilder() .setTableName(tableName) - .setOperation(HBaseSemanticAttributes.Operation.BATCH); + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .setContainerOperations(actions); return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java index fd651a85b26b..d2e196ffa73e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client.trace; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; @@ -26,9 +27,16 @@ import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.SpanKind; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.CheckAndMutate; @@ -76,6 +84,72 @@ public TableOperationSpanBuilder setOperation(final Operation operation) { return this; } + // `setContainerOperations` perform a recursive descent expansion of all the operations + // contained within the provided "batch" object. + + public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) { + final Operation[] ops = mutations.getMutations() + .stream() + .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + public TableOperationSpanBuilder setContainerOperations(final Row row) { + final Operation[] ops = + Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + public TableOperationSpanBuilder setContainerOperations( + final Collection operations + ) { + final Operation[] ops = operations.stream() + .flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())) + .toArray(Operation[]::new); + return setContainerOperations(ops); + } + + private static Set unpackRowOperations(final Row row) { + final Set ops = new HashSet<>(); + if (row instanceof CheckAndMutate) { + final CheckAndMutate cam = (CheckAndMutate) row; + ops.addAll(unpackRowOperations(cam)); + } + if (row instanceof RowMutations) { + final RowMutations mutations = (RowMutations) row; + ops.addAll(unpackRowOperations(mutations)); + } + return ops; + } + + private static Set unpackRowOperations(final CheckAndMutate cam) { + final Set ops = new HashSet<>(); + final Operation op = valueFrom(cam.getAction()); + switch (op) { + case BATCH: + case CHECK_AND_MUTATE: + ops.addAll(unpackRowOperations(cam.getAction())); + break; + default: + ops.add(op); + } + return ops; + } + + public TableOperationSpanBuilder setContainerOperations( + final Operation... operations + ) { + final List ops = Arrays.stream(operations) + .map(op -> op == null ? unknown : op.name()) + .sorted() + .distinct() + .collect(Collectors.toList()); + attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops); + return this; + } + public TableOperationSpanBuilder setTableName(final TableName tableName) { this.tableName = tableName; attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index 5ad39c48623a..a190b838c68a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; @@ -144,11 +145,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - ClientProtos.MultiResponse resp = - ClientProtos.MultiResponse.newBuilder() - .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( - ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) - .build(); + ClientProtos.MultiRequest req = invocation.getArgument(1); + ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder(); + for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) { + RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder(); + for (ClientProtos.Action ignored : regionAction.getActionList()) { + raBuilder.addResultOrException( + ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))); + } + builder.addRegionActionResult(raBuilder); + } + ClientProtos.MultiResponse resp = builder.build(); RpcCallback done = invocation.getArgument(2); ForkJoinPool.commonPool().execute(() -> done.run(resp)); return null; @@ -335,7 +342,9 @@ public void testCheckAndMutateList() { .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf( + "db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE"))); } @Test @@ -343,13 +352,20 @@ public void testCheckAndMutateAll() { table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf( + "db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE"))); } @Test public void testMutateRow() throws IOException { - table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0)))); - assertTrace("BATCH"); + final RowMutations mutations = new RowMutations(Bytes.toBytes(0)) + .add(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .add(new Delete(Bytes.toBytes(0))); + table.mutateRow(mutations).join(); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT"))); } @Test @@ -364,13 +380,15 @@ public void testExistsList() { .allOf( table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test public void testExistsAll() { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test @@ -378,13 +396,15 @@ public void testGetList() { CompletableFuture .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test public void testGetAll() { table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "GET"))); } @Test @@ -393,14 +413,16 @@ public void testPutList() { .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); } @Test public void testPutAll() { table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT"))); } @Test @@ -409,13 +431,15 @@ public void testDeleteList() { .allOf( table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test public void testDeleteAll() { table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test @@ -424,13 +448,15 @@ public void testBatch() { .allOf( table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("BATCH"); + assertTrace("BATCH", hasAttributes( + containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE"))); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java index c3bf3bee59e5..887c6a01a027 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.hasProperty; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; +import java.util.Arrays; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; @@ -48,6 +49,10 @@ public static Matcher containsEntry(String key, String value) { return containsEntry(AttributeKey.stringKey(key), value); } + public static Matcher containsEntryWithStringValuesOf(String key, String... values) { + return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values)); + } + private static final class IsAttributesContaining extends TypeSafeMatcher { private final Matcher> keyMatcher; private final Matcher valueMatcher; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 90c3c858a706..d2f80a7955dd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -32,6 +32,12 @@ public final class HBaseSemanticAttributes { public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); + /** + * For operations that themselves ship one or more operations, such as + * {@link Operation#BATCH} and {@link Operation#CHECK_AND_MUTATE}. + */ + public static final AttributeKey> CONTAINER_DB_OPERATIONS_KEY = + AttributeKey.stringArrayKey("db.hbase.container_operations"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); public static final AttributeKey RPC_SERVICE_KEY =