Skip to content

Commit

Permalink
HBASE-26473 Introduce db.hbase.container_operations span attribute
Browse files Browse the repository at this point in the history
For batch operations, collect and annotate the associated span with the set of all operations
contained in the batch.
  • Loading branch information
ndimiduk committed Nov 20, 2021
1 parent 84ea8e2 commit ac1cfbb
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
preCheck();
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
Expand All @@ -366,7 +367,8 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -383,7 +385,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -427,7 +430,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -443,7 +447,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -459,7 +464,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand All @@ -482,7 +488,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -536,7 +543,8 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -593,7 +601,8 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -668,31 +677,35 @@ public void onComplete() {
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = new TableOperationSpanBuilder()
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.client.trace;

import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.CONTAINER_DB_OPERATIONS_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
Expand All @@ -26,9 +27,16 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.CheckAndMutate;
Expand Down Expand Up @@ -76,6 +84,72 @@ public TableOperationSpanBuilder setOperation(final Operation operation) {
return this;
}

// `setContainerOperations` perform a recursive descent expansion of all the operations
// contained within the provided "batch" object.

public TableOperationSpanBuilder setContainerOperations(final RowMutations mutations) {
final Operation[] ops = mutations.getMutations()
.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(final Row row) {
final Operation[] ops =
Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream())
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

public TableOperationSpanBuilder setContainerOperations(
final Collection<? extends Row> operations
) {
final Operation[] ops = operations.stream()
.flatMap(row -> Stream.concat(Stream.of(valueFrom(row)), unpackRowOperations(row).stream()))
.toArray(Operation[]::new);
return setContainerOperations(ops);
}

private static Set<Operation> unpackRowOperations(final Row row) {
final Set<Operation> ops = new HashSet<>();
if (row instanceof CheckAndMutate) {
final CheckAndMutate cam = (CheckAndMutate) row;
ops.addAll(unpackRowOperations(cam));
}
if (row instanceof RowMutations) {
final RowMutations mutations = (RowMutations) row;
ops.addAll(unpackRowOperations(mutations));
}
return ops;
}

private static Set<Operation> unpackRowOperations(final CheckAndMutate cam) {
final Set<Operation> ops = new HashSet<>();
final Operation op = valueFrom(cam.getAction());
switch (op) {
case BATCH:
case CHECK_AND_MUTATE:
ops.addAll(unpackRowOperations(cam.getAction()));
break;
default:
ops.add(op);
}
return ops;
}

public TableOperationSpanBuilder setContainerOperations(
final Operation... operations
) {
final List<String> ops = Arrays.stream(operations)
.map(op -> op == null ? unknown : op.name())
.sorted()
.distinct()
.collect(Collectors.toList());
attributes.put(CONTAINER_DB_OPERATIONS_KEY, ops);
return this;
}

public TableOperationSpanBuilder setTableName(final TableName tableName) {
this.tableName = tableName;
attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
Expand Down Expand Up @@ -144,11 +145,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ClientProtos.MultiResponse resp =
ClientProtos.MultiResponse.newBuilder()
.addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
.build();
ClientProtos.MultiRequest req = invocation.getArgument(1);
ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
for (ClientProtos.Action ignored : regionAction.getActionList()) {
raBuilder.addResultOrException(
ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
}
builder.addRegionActionResult(raBuilder);
}
ClientProtos.MultiResponse resp = builder.build();
RpcCallback<ClientProtos.MultiResponse> done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
Expand Down Expand Up @@ -335,21 +342,30 @@ public void testCheckAndMutateList() {
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

@Test
public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf(
"db.hbase.container_operations", "CHECK_AND_MUTATE", "DELETE")));
}

@Test
public void testMutateRow() throws IOException {
table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0))));
assertTrace("BATCH");
final RowMutations mutations = new RowMutations(Bytes.toBytes(0))
.add(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
.add(new Delete(Bytes.toBytes(0)));
table.mutateRow(mutations).join();
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE", "PUT")));
}

@Test
Expand All @@ -364,27 +380,31 @@ public void testExistsList() {
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "GET")));
}

@Test
Expand All @@ -393,14 +413,16 @@ public void testPutList() {
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "PUT")));
}

@Test
Expand All @@ -409,13 +431,15 @@ public void testDeleteList() {
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
Expand All @@ -424,13 +448,15 @@ public void testBatch() {
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
assertTrace("BATCH", hasAttributes(
containsEntryWithStringValuesOf("db.hbase.container_operations", "DELETE")));
}

@Test
Expand Down
Loading

0 comments on commit ac1cfbb

Please sign in to comment.