diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index 01a74cc6188c..39be134b1aec 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -200,6 +200,11 @@ mockito-core test + + org.hamcrest + hamcrest-library + test + org.apache.commons commons-crypto diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 3569c0fb4b3b..f72b0651842d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -28,6 +28,8 @@ import com.google.protobuf.Service; import com.google.protobuf.ServiceException; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -42,6 +44,7 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HConstants; @@ -50,11 +53,13 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; @@ -354,9 +359,10 @@ public ResultScanner getScanner(byte [] family, byte [] qualifier) @Override public Result get(final Get get) throws IOException { - return TraceUtil.trace( - () -> get(get, get.isCheckExistenceOnly()), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName)); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(get); + return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier); } private Result get(Get get, final boolean checkExistenceOnly) throws IOException { @@ -396,6 +402,9 @@ protected Result rpcCall() throws Exception { @Override public Result[] get(List gets) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { if (gets.size() == 1) { return new Result[] { get(gets.get(0)) }; @@ -414,12 +423,15 @@ public Result[] get(List gets) throws IOException { } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException().initCause(e); } - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName)); + }, supplier); } @Override public void batch(final List actions, final Object[] results) throws InterruptedException, IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { int rpcTimeout = writeRpcTimeoutMs; boolean hasRead = false; @@ -442,7 +454,7 @@ public void batch(final List actions, final Object[] results) } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException().initCause(e); } - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".batch", tableName)); + }, supplier); } public void batch(final List actions, final Object[] results, int rpcTimeout) @@ -456,10 +468,19 @@ public void batch(final List actions, final Object[] results, int .setOperationTimeout(operationTimeoutMs) .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) .build(); - AsyncRequestFuture ars = multiAp.submit(task); - ars.waitUntilDone(); - if (ars.hasError()) { - throw ars.getErrors(); + final Span span = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .build(); + try (Scope ignored = span.makeCurrent()) { + AsyncRequestFuture ars = multiAp.submit(task); + ars.waitUntilDone(); + if (ars.hasError()) { + TraceUtil.setError(span, ars.getErrors()); + throw ars.getErrors(); + } + } finally { + span.end(); } } @@ -486,15 +507,27 @@ public static void doBatchWithCallback(List actions, Object[] .setRpcTimeout(writeTimeout) .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) .build(); - AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); - ars.waitUntilDone(); - if (ars.hasError()) { - throw ars.getErrors(); + final Span span = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH) + .build(); + try (Scope ignored = span.makeCurrent()) { + AsyncRequestFuture ars = connection.getAsyncProcess().submit(task); + ars.waitUntilDone(); + if (ars.hasError()) { + TraceUtil.setError(span, ars.getErrors()); + throw ars.getErrors(); + } + } finally { + span.end(); } } @Override public void delete(final Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(delete); TraceUtil.traceWithIOException(() -> { ClientServiceCallable callable = new ClientServiceCallable(this.connection, getName(), delete.getRow(), @@ -509,12 +542,14 @@ protected Void rpcCall() throws Exception { }; rpcCallerFactory.newCaller(this.writeRpcTimeoutMs) .callWithRetries(callable, this.operationTimeoutMs); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".delete", tableName)); + }, supplier); } @Override - public void delete(final List deletes) - throws IOException { + public void delete(final List deletes) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { Object[] results = new Object[deletes.size()]; try { @@ -533,11 +568,14 @@ public void delete(final List deletes) } } } - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".deleteList", tableName)); + }, supplier); } @Override public void put(final Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(put); TraceUtil.traceWithIOException(() -> { validatePut(put); ClientServiceCallable callable = @@ -553,11 +591,14 @@ protected Void rpcCall() throws Exception { }; rpcCallerFactory.newCaller(this.writeRpcTimeoutMs) .callWithRetries(callable, this.operationTimeoutMs); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".put", tableName)); + }, supplier); } @Override public void put(final List puts) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); TraceUtil.traceWithIOException(() -> { for (Put put : puts) { validatePut(put); @@ -568,11 +609,14 @@ public void put(final List puts) throws IOException { } catch (InterruptedException e) { throw (InterruptedIOException) new InterruptedIOException().initCause(e); } - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".putList", tableName)); + }, supplier); } @Override public Result mutateRow(final RowMutations rm) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { long nonceGroup = getNonceGroup(); long nonce = getNonce(); @@ -613,7 +657,7 @@ protected MultiResponse rpcCall() throws Exception { throw ars.getErrors(); } return (Result) results[0]; - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".mutateRow", tableName)); + }, supplier); } private long getNonceGroup() { @@ -626,6 +670,9 @@ private long getNonce() { @Override public Result append(final Append append) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(append); return TraceUtil.trace(() -> { checkHasFamilies(append); NoncedRegionServerCallable callable = @@ -645,11 +692,14 @@ protected Result rpcCall() throws Exception { }; return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs). callWithRetries(callable, this.operationTimeoutMs); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".append", tableName)); + }, supplier); } @Override public Result increment(final Increment increment) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(increment); return TraceUtil.trace(() -> { checkHasFamilies(increment); NoncedRegionServerCallable callable = @@ -667,22 +717,23 @@ protected Result rpcCall() throws Exception { }; return rpcCallerFactory. newCaller(writeRpcTimeoutMs).callWithRetries(callable, this.operationTimeoutMs); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName)); + }, supplier); } @Override public long incrementColumnValue(final byte [] row, final byte [] family, final byte [] qualifier, final long amount) throws IOException { - return TraceUtil.trace( - () -> incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName)); + return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); } @Override public long incrementColumnValue(final byte [] row, final byte [] family, final byte [] qualifier, final long amount, final Durability durability) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.INCREMENT); return TraceUtil.trace(() -> { NullPointerException npe = null; if (row == null) { @@ -711,65 +762,83 @@ protected Long rpcCall() throws Exception { }; return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs). callWithRetries(callable, this.operationTimeoutMs); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName)); + }, supplier); } @Override @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put) .isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName)); + supplier); } @Override @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, null, put).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName)); + supplier); } @Override @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName)); + supplier); } @Override @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, delete).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName)); + supplier); } @Override @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, null, delete).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName)); + supplier); } @Override @Deprecated public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName)); + supplier); } @Override @@ -841,25 +910,32 @@ protected MultiResponse rpcCall() throws Exception { @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null, null, rm).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate", - tableName)); + supplier); } @Override @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate", - tableName)); + supplier); } @Override public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(checkAndMutate); return TraceUtil.trace(() -> { Row action = checkAndMutate.getAction(); if (action instanceof Put || action instanceof Delete || action instanceof Increment || @@ -875,8 +951,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(), checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action); } - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate", - tableName)); + }, supplier); } private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family, @@ -907,6 +982,9 @@ protected CheckAndMutateResult rpcCall() throws Exception { @Override public List checkAndMutate(List checkAndMutates) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { if (checkAndMutates.isEmpty()) { return Collections.emptyList(); @@ -929,8 +1007,7 @@ public List checkAndMutate(List checkAndMu ret.add((CheckAndMutateResult) r); } return ret; - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutateList", - tableName)); + }, supplier); } private CompareOperator toCompareOperator(CompareOp compareOp) { @@ -963,15 +1040,21 @@ private CompareOperator toCompareOperator(CompareOp compareOp) { @Override public boolean exists(final Get get) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(get); return TraceUtil.trace(() -> { Result r = get(get, true); assert r.getExists() != null; return r.getExists(); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName)); + }, supplier); } @Override public boolean[] exists(List gets) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return TraceUtil.trace(() -> { if (gets.isEmpty()) { return new boolean[] {}; @@ -1003,7 +1086,7 @@ public boolean[] exists(List gets) throws IOException { } return results; - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName)); + }, supplier); } /** @@ -1373,30 +1456,39 @@ private void preCheck() { @Override public boolean thenPut(Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { validatePut(put); preCheck(); return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put) .isSuccess(); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName)); + }, supplier); } @Override public boolean thenDelete(Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { preCheck(); return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete) .isSuccess(); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName)); + }, supplier); } @Override public boolean thenMutate(RowMutations mutation) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { preCheck(); return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation) .isSuccess(); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName)); + }, supplier); } } @@ -1419,26 +1511,35 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public boolean thenPut(Put put) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace(() -> { validatePut(put); return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put) .isSuccess(); - }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName)); + }, supplier); } @Override public boolean thenDelete(Delete delete) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName)); + supplier); } @Override public boolean thenMutate(RowMutations mutation) throws IOException { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return TraceUtil.trace( () -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation) .isSuccess(), - () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName)); + supplier); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index f637e47f60ae..6a9d7a3aec55 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import com.google.protobuf.RpcChannel; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -258,35 +262,47 @@ private CompletableFuture get(Get get, int replicaId) { @Override public CompletableFuture get(Get get) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(get); return tracedFuture( () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), - "AsyncTable.get", tableName); + supplier); } @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(put); return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(), "AsyncTable.put", tableName); + .call(), supplier); } @Override public CompletableFuture delete(Delete delete) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(delete); return tracedFuture( () -> this. newCaller(delete, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(), - "AsyncTable.delete", tableName); + supplier); } @Override public CompletableFuture append(Append append) { checkHasFamilies(append); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(append); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -295,12 +311,15 @@ public CompletableFuture append(Append append) { controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.append", tableName); + }, supplier); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(increment); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -309,7 +328,7 @@ public CompletableFuture increment(Increment increment) { controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.increment", tableName); + }, supplier); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -367,6 +386,9 @@ private void preCheck() { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, @@ -374,12 +396,15 @@ public CompletableFuture thenPut(Put put) { null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -387,23 +412,26 @@ public CompletableFuture thenDelete(Delete delete) { null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { + public CompletableFuture thenMutate(RowMutations mutations) { preCheck(); - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName); + supplier); } } @@ -435,6 +463,9 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, @@ -443,11 +474,14 @@ public CompletableFuture thenPut(Put put) { filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -455,22 +489,25 @@ public CompletableFuture thenDelete(Delete delete) { timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + public CompletableFuture thenMutate(RowMutations mutations) { + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName); + supplier); } } @@ -481,6 +518,9 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(checkAndMutate); return tracedFuture(() -> { if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || @@ -526,16 +566,19 @@ public CompletableFuture checkAndMutate(CheckAndMutate che "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); return future; } - }, "AsyncTable.checkAndMutate", tableName); + }, supplier); } @Override public List> checkAndMutate(List checkAndMutates) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFutures( () -> batch(checkAndMutates, rpcTimeoutNs).stream() .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), - "AsyncTable.checkAndMutateList", tableName); + supplier); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, @@ -586,6 +629,9 @@ public CompletableFuture mutateRow(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> this . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) @@ -593,7 +639,7 @@ public CompletableFuture mutateRow(RowMutations mutations) { mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), resp -> resp)) .call(), - "AsyncTable.mutateRow", tableName); + supplier); } private Scan setDefaultScanConfig(Scan scan) { @@ -629,6 +675,9 @@ public ResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(scan); return tracedFuture(() -> { CompletableFuture> future = new CompletableFuture<>(); List scanResults = new ArrayList<>(); @@ -650,27 +699,39 @@ public void onComplete() { } }); return future; - }, "AsyncTable.scanAll", tableName); + }, supplier); } @Override public List> get(List gets) { - return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); } @Override public List> put(List puts) { - return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> voidMutate(puts), supplier); } @Override public List> delete(List deletes) { - return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> voidMutate(deletes), supplier); } @Override public List> batch(List actions) { - return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); } private List> voidMutate(List actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java new file mode 100644 index 000000000000..fd651a85b26b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Construct {@link io.opentelemetry.api.trace.Span} instances originating from + * "table operations" -- the verbs in our public API that interact with data in tables. + */ +@InterfaceAudience.Private +public class TableOperationSpanBuilder implements Supplier { + + // n.b. The results of this class are tested implicitly by way of the likes of + // `TestAsyncTableTracing` and friends. + + private static final String unknown = "UNKNOWN"; + + private TableName tableName; + private final Map, Object> attributes = new HashMap<>(); + + @Override public Span get() { + return build(); + } + + public TableOperationSpanBuilder setOperation(final Scan scan) { + return setOperation(valueFrom(scan)); + } + + public TableOperationSpanBuilder setOperation(final Row row) { + return setOperation(valueFrom(row)); + } + + public TableOperationSpanBuilder setOperation(final Operation operation) { + attributes.put(DB_OPERATION, operation.name()); + return this; + } + + public TableOperationSpanBuilder setTableName(final TableName tableName) { + this.tableName = tableName; + attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); + attributes.put(DB_NAME, tableName.getNamespaceAsString()); + attributes.put(TABLE_KEY, tableName.getNameAsString()); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final String name = attributes.getOrDefault(DB_OPERATION, unknown) + + " " + + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown); + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(SpanKind.CLIENT); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + private static Operation valueFrom(final Scan scan) { + if (scan == null) { return null; } + return Operation.SCAN; + } + + private static Operation valueFrom(final Row row) { + if (row == null) { return null; } + if (row instanceof Append) { return Operation.APPEND; } + if (row instanceof CheckAndMutate) { return Operation.CHECK_AND_MUTATE; } + if (row instanceof Delete) { return Operation.DELETE; } + if (row instanceof Get) { return Operation.GET; } + if (row instanceof Increment) { return Operation.INCREMENT; } + if (row instanceof Put) { return Operation.PUT; } + if (row instanceof RegionCoprocessorServiceExec) { + return Operation.COPROC_EXEC; + } + if (row instanceof RowMutations) { return Operation.BATCH; } + return null; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index 15f10b6fbee9..555ec725631c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -17,15 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -42,6 +50,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; @@ -50,6 +59,8 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.hamcrest.core.IsAnything; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -58,10 +69,8 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -219,49 +228,73 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } - private void assertTrace(String methodName) { - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream() - .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) && - span.getKind() == SpanKind.INTERNAL && span.hasEnded())); - SpanData data = traceRule.getSpans().stream() - .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get(); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - TableName tableName = table.getName(); - assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY)); - assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY)); + /** + * All {@link Span}s generated from table data access operations over {@code tableName} should + * include these attributes. + */ + static Matcher buildBaseAttributesMatcher(TableName tableName) { + return hasAttributes(allOf( + containsEntry("db.name", tableName.getNamespaceAsString()), + containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()), + containsEntry("db.hbase.table", tableName.getNameAsString()))); + } + + private void assertTrace(String tableOperation) { + assertTrace(tableOperation, new IsAnything<>()); + } + + private void assertTrace(String tableOperation, Matcher matcher) { + final TableName tableName = table.getName(); + final Matcher spanLocator = allOf( + hasName(containsString(tableOperation)), hasEnded()); + final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); + + Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( + "waiting for span to emit", + () -> traceRule.getSpans(), hasItem(spanLocator))); + SpanData data = traceRule.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); + assertThat(data, allOf( + hasName(expectedName), + hasKind(SpanKind.CLIENT), + hasStatusWithCode(StatusCode.OK), + buildBaseAttributesMatcher(tableName), + matcher)); } @Test public void testExists() { table.exists(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testGet() { table.get(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testPut() { table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("put"); + assertTrace("PUT"); } @Test public void testDelete() { table.delete(new Delete(Bytes.toBytes(0))).join(); - assertTrace("delete"); + assertTrace("DELETE"); } @Test public void testAppend() { table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("append"); + assertTrace("APPEND"); } @Test @@ -270,21 +303,21 @@ public void testIncrement() { .increment( new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue1() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue2() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, Durability.ASYNC_WAL).join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test @@ -292,7 +325,7 @@ public void testCheckAndMutate() { table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("checkAndMutate"); + assertTrace("CHECK_AND_MUTATE"); } @Test @@ -302,7 +335,7 @@ public void testCheckAndMutateList() { .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) .join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); } @Test @@ -310,7 +343,7 @@ public void testCheckAndMutateAll() { table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); } @Test @@ -319,13 +352,13 @@ public void testMutateRow() throws Exception { RowMutations mutation = new RowMutations(row); mutation.add(new Delete(row)); table.mutateRow(mutation).get(); - assertTrace("mutateRow"); + assertTrace("BATCH"); } @Test - public void testScanAll() throws IOException { + public void testScanAll() { table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); - assertTrace("scanAll"); + assertTrace("SCAN"); } @Test @@ -334,13 +367,13 @@ public void testExistsList() { .allOf( table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testExistsAll() { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -348,13 +381,13 @@ public void testGetList() { CompletableFuture .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testGetAll() { table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -363,14 +396,14 @@ public void testPutList() { .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) .join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test public void testPutAll() { table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test @@ -379,13 +412,13 @@ public void testDeleteList() { .allOf( table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test public void testDeleteAll() { table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test @@ -394,13 +427,13 @@ public void testBatch() { .allOf( table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java index 915e4006f25c..8e6409ef3c92 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; @@ -25,6 +34,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.util.Arrays; import java.util.concurrent.ForkJoinPool; @@ -34,13 +46,17 @@ import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.hamcrest.core.IsAnything; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -217,56 +233,82 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } + private void assertTrace(String tableOperation) { + assertTrace(tableOperation, new IsAnything<>()); + } + + private void assertTrace(String tableOperation, Matcher matcher) { + final TableName tableName = table.getName(); + final Matcher spanLocator = allOf( + hasName(containsString(tableOperation)), hasEnded()); + final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); + + Waiter.waitFor(conf, 1000, new MatcherPredicate<>( + "waiting for span to emit", + () -> TRACE_RULE.getSpans(), hasItem(spanLocator))); + SpanData data = TRACE_RULE.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); + assertThat(data, allOf( + hasName(expectedName), + hasKind(SpanKind.CLIENT), + hasStatusWithCode(StatusCode.OK), + buildBaseAttributesMatcher(tableName), + matcher)); + } + @Test public void testPut() throws IOException { table.put(new Put(Bytes.toBytes(0)) .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))); - assertTrace(HTable.class.getSimpleName(), "put", null, TableName.META_TABLE_NAME); + assertTrace("PUT"); } @Test public void testExists() throws IOException { table.exists(new Get(Bytes.toBytes(0))); - assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME); + assertTrace("GET"); } @Test public void testGet() throws IOException { table.get(new Get(Bytes.toBytes(0))); - assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME); + assertTrace("GET"); } @Test public void testDelete() throws IOException { table.delete(new Delete(Bytes.toBytes(0))); - assertTrace(HTable.class.getSimpleName(), "delete", null, TableName.META_TABLE_NAME); + assertTrace("DELETE"); } @Test public void testAppend() throws IOException { table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))); - assertTrace(HTable.class.getSimpleName(), "append", null, TableName.META_TABLE_NAME); + assertTrace("APPEND"); } @Test public void testIncrement() throws IOException { table.increment( new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)); - assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue1() throws IOException { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1); - assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue2() throws IOException { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, Durability.SYNC_WAL); - assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME); + assertTrace("INCREMENT"); } @Test @@ -274,7 +316,7 @@ public void testCheckAndMutate() throws IOException { table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0)))); - assertTrace(HTable.class.getSimpleName(), "checkAndMutate", null, TableName.META_TABLE_NAME); + assertTrace("CHECK_AND_MUTATE"); } @Test @@ -282,8 +324,7 @@ public void testCheckAndMutateList() throws IOException { table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))); - assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null, - TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test @@ -291,52 +332,51 @@ public void testCheckAndMutateAll() throws IOException { table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))); - assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null, - TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testMutateRow() throws Exception { byte[] row = Bytes.toBytes(0); table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row)))); - assertTrace(HTable.class.getSimpleName(), "mutateRow", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testExistsList() throws IOException { table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))); - assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testExistsAll() throws IOException { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))); - assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testGetList() throws IOException { table.get(Arrays.asList(new Get(Bytes.toBytes(0)))); - assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testPutList() throws IOException { table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))); - assertTrace(HTable.class.getSimpleName(), "putList", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testDeleteList() throws IOException { table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0)))); - assertTrace(HTable.class.getSimpleName(), "deleteList", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test public void testBatchList() throws IOException, InterruptedException { table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null); - assertTrace(HTable.class.getSimpleName(), "batch", null, TableName.META_TABLE_NAME); + assertTrace("BATCH"); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java new file mode 100644 index 000000000000..c3bf3bee59e5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}. + */ +public final class AttributesMatchers { + + private AttributesMatchers() { } + + public static Matcher containsEntry( + Matcher> keyMatcher, + Matcher valueMatcher + ) { + return new IsAttributesContaining<>(keyMatcher, valueMatcher); + } + + public static Matcher containsEntry(AttributeKey key, T value) { + return containsEntry(equalTo(key), equalTo(value)); + } + + public static Matcher containsEntry(String key, String value) { + return containsEntry(AttributeKey.stringKey(key), value); + } + + private static final class IsAttributesContaining extends TypeSafeMatcher { + private final Matcher> keyMatcher; + private final Matcher valueMatcher; + + private IsAttributesContaining( + final Matcher> keyMatcher, + final Matcher valueMatcher + ) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + protected boolean matchesSafely(Attributes item) { + return item.asMap().entrySet().stream().anyMatch(e -> allOf( + hasProperty("key", keyMatcher), + hasProperty("value", valueMatcher)) + .matches(e)); + } + + @Override + public void describeMismatchSafely(Attributes item, Description mismatchDescription) { + mismatchDescription + .appendText("Attributes was ") + .appendValueList("[", ", ", "]", item.asMap().entrySet()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Attributes containing [") + .appendDescriptionOf(keyMatcher) + .appendText("->") + .appendDescriptionOf(valueMatcher) + .appendText("]"); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java new file mode 100644 index 000000000000..01142850f43f --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.util.Objects; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link SpanData}. + */ +public final class SpanDataMatchers { + + private SpanDataMatchers() { } + + public static Matcher hasAttributes(Matcher matcher) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final Attributes attributes = item.getAttributes(); + return attributes != null && matcher.matches(attributes); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData having ").appendDescriptionOf(matcher); + } + }; + } + + public static Matcher hasEnded() { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + return item.hasEnded(); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData that hasEnded"); + } + }; + } + + public static Matcher hasKind(SpanKind kind) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + return Objects.equals(item.getKind(), kind); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with kind of ").appendValue(kind); + } + }; + } + + public static Matcher hasName(String name) { + return hasName(equalTo(name)); + } + + public static Matcher hasName(Matcher matcher) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final String name = item.getName(); + return name != null && matcher.matches(name); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with a name that ").appendDescriptionOf(matcher); + } + }; + } + + public static Matcher hasStatusWithCode(StatusCode statusCode) { + final Matcher matcher = is(equalTo(statusCode)); + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final StatusData statusData = item.getStatus(); + return statusData != null + && statusData.getStatusCode() != null + && matcher.matches(statusData.getStatusCode()); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher); + } + }; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 1eb5d820d998..90c3c858a706 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -28,7 +28,9 @@ */ @InterfaceAudience.Private public final class HBaseSemanticAttributes { + public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME; public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; + public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); @@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes { AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); + /** + * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of + * {@code org.apache.hadoop.hbase.client.Operation}, as well as + * {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple + * operations. + */ + public enum Operation { + APPEND, + BATCH, + CHECK_AND_MUTATE, + COPROC_EXEC, + DELETE, + GET, + INCREMENT, + PUT, + SCAN, + } + private HBaseSemanticAttributes() { } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 882eed4dab09..1c428ae5608d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -91,9 +91,11 @@ public static Span createClientSpan(String name) { /** * Trace an asynchronous operation for a table. */ - public static CompletableFuture tracedFuture(Supplier> action, - String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + public static CompletableFuture tracedFuture( + Supplier> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { CompletableFuture future = action.get(); endSpan(future, span); @@ -119,8 +121,10 @@ public static CompletableFuture tracedFuture(Supplier List> tracedFutures( - Supplier>> action, String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + Supplier>> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { List> futures = action.get(); endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);