diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 604057190c5d..a5af55a41c88 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -200,6 +200,11 @@
mockito-core
test
+
+ org.hamcrest
+ hamcrest-library
+ test
+
org.apache.commons
commons-crypto
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 3569c0fb4b3b..f72b0651842d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -28,6 +28,8 @@
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -42,6 +44,7 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
@@ -50,11 +53,13 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@@ -354,9 +359,10 @@ public ResultScanner getScanner(byte [] family, byte [] qualifier)
@Override
public Result get(final Get get) throws IOException {
- return TraceUtil.trace(
- () -> get(get, get.isCheckExistenceOnly()),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName));
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(get);
+ return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
}
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
@@ -396,6 +402,9 @@ protected Result rpcCall() throws Exception {
@Override
public Result[] get(List gets) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
if (gets.size() == 1) {
return new Result[] { get(gets.get(0)) };
@@ -414,12 +423,15 @@ public Result[] get(List gets) throws IOException {
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName));
+ }, supplier);
}
@Override
public void batch(final List extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
int rpcTimeout = writeRpcTimeoutMs;
boolean hasRead = false;
@@ -442,7 +454,7 @@ public void batch(final List extends Row> actions, final Object[] results)
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".batch", tableName));
+ }, supplier);
}
public void batch(final List extends Row> actions, final Object[] results, int rpcTimeout)
@@ -456,10 +468,19 @@ public void batch(final List extends Row> actions, final Object[] results, int
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
- AsyncRequestFuture ars = multiAp.submit(task);
- ars.waitUntilDone();
- if (ars.hasError()) {
- throw ars.getErrors();
+ final Span span = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH)
+ .build();
+ try (Scope ignored = span.makeCurrent()) {
+ AsyncRequestFuture ars = multiAp.submit(task);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ TraceUtil.setError(span, ars.getErrors());
+ throw ars.getErrors();
+ }
+ } finally {
+ span.end();
}
}
@@ -486,15 +507,27 @@ public static void doBatchWithCallback(List extends Row> actions, Object[]
.setRpcTimeout(writeTimeout)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
- AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
- ars.waitUntilDone();
- if (ars.hasError()) {
- throw ars.getErrors();
+ final Span span = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH)
+ .build();
+ try (Scope ignored = span.makeCurrent()) {
+ AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ TraceUtil.setError(span, ars.getErrors());
+ throw ars.getErrors();
+ }
+ } finally {
+ span.end();
}
}
@Override
public void delete(final Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(delete);
TraceUtil.traceWithIOException(() -> {
ClientServiceCallable callable =
new ClientServiceCallable(this.connection, getName(), delete.getRow(),
@@ -509,12 +542,14 @@ protected Void rpcCall() throws Exception {
};
rpcCallerFactory.newCaller(this.writeRpcTimeoutMs)
.callWithRetries(callable, this.operationTimeoutMs);
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".delete", tableName));
+ }, supplier);
}
@Override
- public void delete(final List deletes)
- throws IOException {
+ public void delete(final List deletes) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
Object[] results = new Object[deletes.size()];
try {
@@ -533,11 +568,14 @@ public void delete(final List deletes)
}
}
}
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".deleteList", tableName));
+ }, supplier);
}
@Override
public void put(final Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(put);
TraceUtil.traceWithIOException(() -> {
validatePut(put);
ClientServiceCallable callable =
@@ -553,11 +591,14 @@ protected Void rpcCall() throws Exception {
};
rpcCallerFactory.newCaller(this.writeRpcTimeoutMs)
.callWithRetries(callable, this.operationTimeoutMs);
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".put", tableName));
+ }, supplier);
}
@Override
public void put(final List puts) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
for (Put put : puts) {
validatePut(put);
@@ -568,11 +609,14 @@ public void put(final List puts) throws IOException {
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".putList", tableName));
+ }, supplier);
}
@Override
public Result mutateRow(final RowMutations rm) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
long nonceGroup = getNonceGroup();
long nonce = getNonce();
@@ -613,7 +657,7 @@ protected MultiResponse rpcCall() throws Exception {
throw ars.getErrors();
}
return (Result) results[0];
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".mutateRow", tableName));
+ }, supplier);
}
private long getNonceGroup() {
@@ -626,6 +670,9 @@ private long getNonce() {
@Override
public Result append(final Append append) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(append);
return TraceUtil.trace(() -> {
checkHasFamilies(append);
NoncedRegionServerCallable callable =
@@ -645,11 +692,14 @@ protected Result rpcCall() throws Exception {
};
return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs).
callWithRetries(callable, this.operationTimeoutMs);
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".append", tableName));
+ }, supplier);
}
@Override
public Result increment(final Increment increment) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(increment);
return TraceUtil.trace(() -> {
checkHasFamilies(increment);
NoncedRegionServerCallable callable =
@@ -667,22 +717,23 @@ protected Result rpcCall() throws Exception {
};
return rpcCallerFactory. newCaller(writeRpcTimeoutMs).callWithRetries(callable,
this.operationTimeoutMs);
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
+ }, supplier);
}
@Override
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount)
throws IOException {
- return TraceUtil.trace(
- () -> incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
+ return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
}
@Override
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount, final Durability durability)
throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
return TraceUtil.trace(() -> {
NullPointerException npe = null;
if (row == null) {
@@ -711,65 +762,83 @@ protected Long rpcCall() throws Exception {
};
return rpcCallerFactory. newCaller(this.writeRpcTimeoutMs).
callWithRetries(callable, this.operationTimeoutMs);
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".increment", tableName));
+ }, supplier);
}
@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
.isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, put).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndPut", tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
delete).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, delete).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndDelete", tableName));
+ supplier);
}
@Override
@@ -841,25 +910,32 @@ protected MultiResponse rpcCall() throws Exception {
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, rm).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
- tableName));
+ supplier);
}
@Override
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
- tableName));
+ supplier);
}
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(checkAndMutate);
return TraceUtil.trace(() -> {
Row action = checkAndMutate.getAction();
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
@@ -875,8 +951,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws
checkAndMutate.getQualifier(), checkAndMutate.getCompareOp(), checkAndMutate.getValue(),
checkAndMutate.getFilter(), checkAndMutate.getTimeRange(), (RowMutations) action);
}
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutate",
- tableName));
+ }, supplier);
}
private CheckAndMutateResult doCheckAndMutate(final byte[] row, final byte[] family,
@@ -907,6 +982,9 @@ protected CheckAndMutateResult rpcCall() throws Exception {
@Override
public List checkAndMutate(List checkAndMutates)
throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
if (checkAndMutates.isEmpty()) {
return Collections.emptyList();
@@ -929,8 +1007,7 @@ public List checkAndMutate(List checkAndMu
ret.add((CheckAndMutateResult) r);
}
return ret;
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".checkAndMutateList",
- tableName));
+ }, supplier);
}
private CompareOperator toCompareOperator(CompareOp compareOp) {
@@ -963,15 +1040,21 @@ private CompareOperator toCompareOperator(CompareOp compareOp) {
@Override
public boolean exists(final Get get) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(get);
return TraceUtil.trace(() -> {
Result r = get(get, true);
assert r.getExists() != null;
return r.getExists();
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".get", tableName));
+ }, supplier);
}
@Override
public boolean[] exists(List gets) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
if (gets.isEmpty()) {
return new boolean[] {};
@@ -1003,7 +1086,7 @@ public boolean[] exists(List gets) throws IOException {
}
return results;
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".getList", tableName));
+ }, supplier);
}
/**
@@ -1373,30 +1456,39 @@ private void preCheck() {
@Override
public boolean thenPut(Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
validatePut(put);
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, put)
.isSuccess();
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName));
+ }, supplier);
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, delete)
.isSuccess();
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName));
+ }, supplier);
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
preCheck();
return doCheckAndMutate(row, family, qualifier, op, value, null, timeRange, mutation)
.isSuccess();
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName));
+ }, supplier);
}
}
@@ -1419,26 +1511,35 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@Override
public boolean thenPut(Put put) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
validatePut(put);
return doCheckAndMutate(row, null, null, null, null, filter, timeRange, put)
.isSuccess();
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenPut", tableName));
+ }, supplier);
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, delete).isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenDelete", tableName));
+ supplier);
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
+ final Supplier supplier = new TableOperationSpanBuilder()
+ .setTableName(tableName)
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, null, null, null, null, filter, timeRange, mutation)
.isSuccess(),
- () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".thenMutate", tableName));
+ supplier);
}
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index f637e47f60ae..5ed5e7ef6709 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +28,7 @@
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
+import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -37,6 +38,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -44,9 +46,11 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -256,37 +260,49 @@ private CompletableFuture get(Get get, int replicaId) {
.replicaId(replicaId).call();
}
+ private TableOperationSpanBuilder newTableOperationSpanBuilder() {
+ return new TableOperationSpanBuilder().setTableName(tableName);
+ }
+
@Override
public CompletableFuture get(Get get) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(get);
return tracedFuture(
() -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()),
- "AsyncTable.get", tableName);
+ supplier);
}
@Override
public CompletableFuture put(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(put);
return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub,
put, RequestConverter::buildMutateRequest))
- .call(), "AsyncTable.put", tableName);
+ .call(), supplier);
}
@Override
public CompletableFuture delete(Delete delete) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(delete);
return tracedFuture(
() -> this. newCaller(delete, writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc,
stub, delete, RequestConverter::buildMutateRequest))
.call(),
- "AsyncTable.delete", tableName);
+ supplier);
}
@Override
public CompletableFuture append(Append append) {
checkHasFamilies(append);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(append);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
@@ -295,12 +311,14 @@ public CompletableFuture append(Append append) {
controller, loc, stub, append, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
- }, "AsyncTable.append", tableName);
+ }, supplier);
}
@Override
public CompletableFuture increment(Increment increment) {
checkHasFamilies(increment);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(increment);
return tracedFuture(() -> {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
@@ -309,7 +327,7 @@ public CompletableFuture increment(Increment increment) {
controller, loc, stub, increment, RequestConverter::buildMutateRequest,
RawAsyncTableImpl::toResult))
.call();
- }, "AsyncTable.increment", tableName);
+ }, supplier);
}
private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
@@ -367,6 +385,8 @@ private void preCheck() {
public CompletableFuture thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
@@ -374,12 +394,14 @@ public CompletableFuture thenPut(Put put) {
null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
- "AsyncTable.CheckAndMutateBuilder.thenPut", tableName);
+ supplier);
}
@Override
public CompletableFuture thenDelete(Delete delete) {
preCheck();
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
@@ -387,23 +409,25 @@ public CompletableFuture thenDelete(Delete delete) {
null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
- "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName);
+ supplier);
}
@Override
- public CompletableFuture thenMutate(RowMutations mutation) {
+ public CompletableFuture thenMutate(RowMutations mutations) {
preCheck();
- validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
+ validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
- . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
+ . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
- mutation,
+ mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value,
null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
- "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName);
+ supplier);
}
}
@@ -435,6 +459,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
@Override
public CompletableFuture thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
@@ -443,11 +469,13 @@ public CompletableFuture thenPut(Put put) {
filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
- "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName);
+ supplier);
}
@Override
public CompletableFuture thenDelete(Delete delete) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
@@ -455,22 +483,24 @@ public CompletableFuture thenDelete(Delete delete) {
timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE),
(c, r) -> r.getProcessed()))
.call(),
- "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName);
+ supplier);
}
@Override
- public CompletableFuture thenMutate(RowMutations mutation) {
- validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize());
+ public CompletableFuture thenMutate(RowMutations mutations) {
+ validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return tracedFuture(
() -> RawAsyncTableImpl.this
- . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
+ . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub,
- mutation,
+ mutations,
(rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter,
timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE),
CheckAndMutateResult::isSuccess))
.call(),
- "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName);
+ supplier);
}
}
@@ -481,6 +511,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
@Override
public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(checkAndMutate);
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
@@ -526,16 +558,18 @@ public CompletableFuture checkAndMutate(CheckAndMutate che
"CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName()));
return future;
}
- }, "AsyncTable.checkAndMutate", tableName);
+ }, supplier);
}
@Override
public List>
checkAndMutate(List checkAndMutates) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
- "AsyncTable.checkAndMutateList", tableName);
+ supplier);
}
// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
@@ -586,6 +620,8 @@ public CompletableFuture mutateRow(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(mutations);
return tracedFuture(
() -> this
. newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
@@ -593,7 +629,7 @@ public CompletableFuture mutateRow(RowMutations mutations) {
mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce),
resp -> resp))
.call(),
- "AsyncTable.mutateRow", tableName);
+ supplier);
}
private Scan setDefaultScanConfig(Scan scan) {
@@ -629,6 +665,8 @@ public ResultScanner getScanner(Scan scan) {
@Override
public CompletableFuture> scanAll(Scan scan) {
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(scan);
return tracedFuture(() -> {
CompletableFuture> future = new CompletableFuture<>();
List scanResults = new ArrayList<>();
@@ -650,27 +688,35 @@ public void onComplete() {
}
});
return future;
- }, "AsyncTable.scanAll", tableName);
+ }, supplier);
}
@Override
public List> get(List gets) {
- return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(gets);
+ return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}
@Override
public List> put(List puts) {
- return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(puts);
+ return tracedFutures(() -> voidMutate(puts), supplier);
}
@Override
public List> delete(List deletes) {
- return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(deletes);
+ return tracedFutures(() -> voidMutate(deletes), supplier);
}
@Override
public List> batch(List extends Row> actions) {
- return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName);
+ final Supplier supplier = newTableOperationSpanBuilder()
+ .setOperation(actions);
+ return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}
private List> voidMutate(List extends Row> actions) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java
new file mode 100644
index 000000000000..aaa53610321e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.trace;
+
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Construct {@link io.opentelemetry.api.trace.Span} instances originating from
+ * "table operations" -- the verbs in our public API that interact with data in tables.
+ */
+@InterfaceAudience.Private
+public class TableOperationSpanBuilder implements Supplier {
+
+ // n.b. The results of this class are tested implicitly by way of the likes of
+ // `TestAsyncTableTracing` and friends.
+
+ private static final String unknown = "UNKNOWN";
+
+ private TableName tableName;
+ private final Map, Object> attributes = new HashMap<>();
+
+ @Override public Span get() {
+ return build();
+ }
+
+ public TableOperationSpanBuilder setOperation(final Scan scan) {
+ return setOperation(valueFrom(scan));
+ }
+
+ public TableOperationSpanBuilder setOperation(final Row row) {
+ return setOperation(valueFrom(row));
+ }
+
+ @SuppressWarnings("unused")
+ public TableOperationSpanBuilder setOperation(final Collection extends Row> operations) {
+ return setOperation(Operation.BATCH);
+ }
+
+ public TableOperationSpanBuilder setOperation(final Operation operation) {
+ attributes.put(DB_OPERATION, operation.name());
+ return this;
+ }
+
+ public TableOperationSpanBuilder setTableName(final TableName tableName) {
+ this.tableName = tableName;
+ attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
+ attributes.put(DB_NAME, tableName.getNamespaceAsString());
+ attributes.put(TABLE_KEY, tableName.getNameAsString());
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Span build() {
+ final String name = attributes.getOrDefault(DB_OPERATION, unknown)
+ + " "
+ + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown);
+ final SpanBuilder builder = TraceUtil.getGlobalTracer()
+ .spanBuilder(name)
+ // TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
+ .setSpanKind(SpanKind.CLIENT);
+ attributes.forEach((k, v) -> builder.setAttribute((AttributeKey super Object>) k, v));
+ return builder.startSpan();
+ }
+
+ private static Operation valueFrom(final Scan scan) {
+ if (scan == null) {
+ return null;
+ }
+ return Operation.SCAN;
+ }
+
+ private static Operation valueFrom(final Row row) {
+ if (row == null) {
+ return null;
+ }
+ if (row instanceof Append) {
+ return Operation.APPEND;
+ }
+ if (row instanceof CheckAndMutate) {
+ return Operation.CHECK_AND_MUTATE;
+ }
+ if (row instanceof Delete) {
+ return Operation.DELETE;
+ }
+ if (row instanceof Get) {
+ return Operation.GET;
+ }
+ if (row instanceof Increment) {
+ return Operation.INCREMENT;
+ }
+ if (row instanceof Put) {
+ return Operation.PUT;
+ }
+ if (row instanceof RegionCoprocessorServiceExec) {
+ return Operation.COPROC_EXEC;
+ }
+ if (row instanceof RowMutations) {
+ return Operation.BATCH;
+ }
+ return null;
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
index 15f10b6fbee9..af7ee8b2bafc 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
@@ -17,15 +17,24 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-
+import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -42,14 +51,18 @@
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.hamcrest.Matcher;
+import org.hamcrest.core.IsAnything;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -58,10 +71,8 @@
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -134,11 +145,17 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- ClientProtos.MultiResponse resp =
- ClientProtos.MultiResponse.newBuilder()
- .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException(
- ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result()))))
- .build();
+ ClientProtos.MultiRequest req = invocation.getArgument(1);
+ ClientProtos.MultiResponse.Builder builder = ClientProtos.MultiResponse.newBuilder();
+ for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
+ RegionActionResult.Builder raBuilder = RegionActionResult.newBuilder();
+ for (ClientProtos.Action ignored : regionAction.getActionList()) {
+ raBuilder.addResultOrException(
+ ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())));
+ }
+ builder.addRegionActionResult(raBuilder);
+ }
+ ClientProtos.MultiResponse resp = builder.build();
RpcCallback done = invocation.getArgument(2);
ForkJoinPool.commonPool().execute(() -> done.run(resp));
return null;
@@ -219,49 +236,73 @@ public void tearDown() throws IOException {
Closeables.close(conn, true);
}
- private void assertTrace(String methodName) {
- Waiter.waitFor(CONF, 1000,
- () -> traceRule.getSpans().stream()
- .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) &&
- span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
- SpanData data = traceRule.getSpans().stream()
- .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get();
- assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
- TableName tableName = table.getName();
- assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY));
- assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY));
+ /**
+ * All {@link Span}s generated from table data access operations over {@code tableName} should
+ * include these attributes.
+ */
+ static Matcher buildBaseAttributesMatcher(TableName tableName) {
+ return hasAttributes(allOf(
+ containsEntry("db.name", tableName.getNamespaceAsString()),
+ containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
+ containsEntry("db.hbase.table", tableName.getNameAsString())));
+ }
+
+ private void assertTrace(String tableOperation) {
+ assertTrace(tableOperation, new IsAnything<>());
+ }
+
+ private void assertTrace(String tableOperation, Matcher matcher) {
+ final TableName tableName = table.getName();
+ final Matcher spanLocator = allOf(
+ hasName(containsString(tableOperation)), hasEnded());
+ final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
+
+ Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
+ "waiting for span to emit",
+ () -> traceRule.getSpans(), hasItem(spanLocator)));
+ SpanData data = traceRule.getSpans()
+ .stream()
+ .filter(spanLocator::matches)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertThat(data, allOf(
+ hasName(expectedName),
+ hasKind(SpanKind.CLIENT),
+ hasStatusWithCode(StatusCode.OK),
+ buildBaseAttributesMatcher(tableName),
+ matcher));
}
@Test
public void testExists() {
table.exists(new Get(Bytes.toBytes(0))).join();
- assertTrace("get");
+ assertTrace("GET");
}
@Test
public void testGet() {
table.get(new Get(Bytes.toBytes(0))).join();
- assertTrace("get");
+ assertTrace("GET");
}
@Test
public void testPut() {
table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
- assertTrace("put");
+ assertTrace("PUT");
}
@Test
public void testDelete() {
table.delete(new Delete(Bytes.toBytes(0))).join();
- assertTrace("delete");
+ assertTrace("DELETE");
}
@Test
public void testAppend() {
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v"))).join();
- assertTrace("append");
+ assertTrace("APPEND");
}
@Test
@@ -270,21 +311,21 @@ public void testIncrement() {
.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1))
.join();
- assertTrace("increment");
+ assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue1() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)
.join();
- assertTrace("increment");
+ assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue2() {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
Durability.ASYNC_WAL).join();
- assertTrace("increment");
+ assertTrace("INCREMENT");
}
@Test
@@ -292,7 +333,7 @@ public void testCheckAndMutate() {
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))).join();
- assertTrace("checkAndMutate");
+ assertTrace("CHECK_AND_MUTATE");
}
@Test
@@ -302,7 +343,7 @@ public void testCheckAndMutateList() {
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("checkAndMutateList");
+ assertTrace("BATCH");
}
@Test
@@ -310,7 +351,88 @@ public void testCheckAndMutateAll() {
table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))))).join();
- assertTrace("checkAndMutateList");
+ assertTrace("BATCH");
+ }
+
+ private void testCheckAndMutateBuilder(Row op) {
+ AsyncTable.CheckAndMutateBuilder builder =
+ table.checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf"))
+ .qualifier(Bytes.toBytes("cq"))
+ .ifEquals(Bytes.toBytes("v"));
+ if (op instanceof Put) {
+ Put put = (Put) op;
+ builder.thenPut(put).join();
+ } else if (op instanceof Delete) {
+ Delete delete = (Delete) op;
+ builder.thenDelete(delete).join();
+ } else if (op instanceof RowMutations) {
+ RowMutations mutations = (RowMutations) op;
+ builder.thenMutate(mutations).join();
+ } else {
+ fail("unsupported CheckAndPut operation " + op);
+ }
+ assertTrace("CHECK_AND_MUTATE");
+ }
+
+ @Test
+ public void testCheckAndMutateBuilderThenPut() {
+ Put put = new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
+ testCheckAndMutateBuilder(put);
+ }
+
+ @Test
+ public void testCheckAndMutateBuilderThenDelete() {
+ testCheckAndMutateBuilder(new Delete(Bytes.toBytes(0)));
+ }
+
+ @Test
+ public void testCheckAndMutateBuilderThenMutations() throws IOException {
+ RowMutations mutations = new RowMutations(Bytes.toBytes(0))
+ .add((Mutation) (new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"))))
+ .add((Mutation) new Delete(Bytes.toBytes(0)));
+ testCheckAndMutateBuilder(mutations);
+ }
+
+ private void testCheckAndMutateWithFilterBuilder(Row op) {
+ // use of `PrefixFilter` is completely arbitrary here.
+ AsyncTable.CheckAndMutateWithFilterBuilder builder =
+ table.checkAndMutate(Bytes.toBytes(0), new PrefixFilter(Bytes.toBytes(0)));
+ if (op instanceof Put) {
+ Put put = (Put) op;
+ builder.thenPut(put).join();
+ } else if (op instanceof Delete) {
+ Delete delete = (Delete) op;
+ builder.thenDelete(delete).join();
+ } else if (op instanceof RowMutations) {
+ RowMutations mutations = (RowMutations) op;
+ builder.thenMutate(mutations).join();
+ } else {
+ fail("unsupported CheckAndPut operation " + op);
+ }
+ assertTrace("CHECK_AND_MUTATE");
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterBuilderThenPut() {
+ Put put = new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v"));
+ testCheckAndMutateWithFilterBuilder(put);
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterBuilderThenDelete() {
+ testCheckAndMutateWithFilterBuilder(new Delete(Bytes.toBytes(0)));
+ }
+
+ @Test
+ public void testCheckAndMutateWithFilterBuilderThenMutations() throws IOException {
+ RowMutations mutations = new RowMutations(Bytes.toBytes(0))
+ .add((Mutation) new Put(Bytes.toBytes(0))
+ .addColumn(Bytes.toBytes("f"), Bytes.toBytes("cq"), Bytes.toBytes("v")))
+ .add((Mutation) new Delete(Bytes.toBytes(0)));
+ testCheckAndMutateWithFilterBuilder(mutations);
}
@Test
@@ -319,13 +441,13 @@ public void testMutateRow() throws Exception {
RowMutations mutation = new RowMutations(row);
mutation.add(new Delete(row));
table.mutateRow(mutation).get();
- assertTrace("mutateRow");
+ assertTrace("BATCH");
}
@Test
- public void testScanAll() throws IOException {
+ public void testScanAll() {
table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join();
- assertTrace("scanAll");
+ assertTrace("SCAN");
}
@Test
@@ -334,13 +456,13 @@ public void testExistsList() {
.allOf(
table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("getList");
+ assertTrace("BATCH");
}
@Test
public void testExistsAll() {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
- assertTrace("getList");
+ assertTrace("BATCH");
}
@Test
@@ -348,13 +470,13 @@ public void testGetList() {
CompletableFuture
.allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("getList");
+ assertTrace("BATCH");
}
@Test
public void testGetAll() {
table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join();
- assertTrace("getList");
+ assertTrace("BATCH");
}
@Test
@@ -363,14 +485,14 @@ public void testPutList() {
.allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("putList");
+ assertTrace("BATCH");
}
@Test
public void testPutAll() {
table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v")))).join();
- assertTrace("putList");
+ assertTrace("BATCH");
}
@Test
@@ -379,13 +501,13 @@ public void testDeleteList() {
.allOf(
table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("deleteList");
+ assertTrace("BATCH");
}
@Test
public void testDeleteAll() {
table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
- assertTrace("deleteList");
+ assertTrace("BATCH");
}
@Test
@@ -394,13 +516,13 @@ public void testBatch() {
.allOf(
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0]))
.join();
- assertTrace("batch");
+ assertTrace("BATCH");
}
@Test
public void testBatchAll() {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
- assertTrace("batch");
+ assertTrace("BATCH");
}
@Test
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
index 915e4006f25c..8e6409ef3c92 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
@@ -17,6 +17,15 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasItem;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -25,6 +34,9 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
@@ -34,13 +46,17 @@
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.hamcrest.Matcher;
+import org.hamcrest.core.IsAnything;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@@ -217,56 +233,82 @@ public void tearDown() throws IOException {
Closeables.close(conn, true);
}
+ private void assertTrace(String tableOperation) {
+ assertTrace(tableOperation, new IsAnything<>());
+ }
+
+ private void assertTrace(String tableOperation, Matcher matcher) {
+ final TableName tableName = table.getName();
+ final Matcher spanLocator = allOf(
+ hasName(containsString(tableOperation)), hasEnded());
+ final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString();
+
+ Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
+ "waiting for span to emit",
+ () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
+ SpanData data = TRACE_RULE.getSpans()
+ .stream()
+ .filter(spanLocator::matches)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertThat(data, allOf(
+ hasName(expectedName),
+ hasKind(SpanKind.CLIENT),
+ hasStatusWithCode(StatusCode.OK),
+ buildBaseAttributesMatcher(tableName),
+ matcher));
+ }
+
@Test
public void testPut() throws IOException {
table.put(new Put(Bytes.toBytes(0))
.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")));
- assertTrace(HTable.class.getSimpleName(), "put", null, TableName.META_TABLE_NAME);
+ assertTrace("PUT");
}
@Test
public void testExists() throws IOException {
table.exists(new Get(Bytes.toBytes(0)));
- assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME);
+ assertTrace("GET");
}
@Test
public void testGet() throws IOException {
table.get(new Get(Bytes.toBytes(0)));
- assertTrace(HTable.class.getSimpleName(), "get", null, TableName.META_TABLE_NAME);
+ assertTrace("GET");
}
@Test
public void testDelete() throws IOException {
table.delete(new Delete(Bytes.toBytes(0)));
- assertTrace(HTable.class.getSimpleName(), "delete", null, TableName.META_TABLE_NAME);
+ assertTrace("DELETE");
}
@Test
public void testAppend() throws IOException {
table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"),
Bytes.toBytes("v")));
- assertTrace(HTable.class.getSimpleName(), "append", null, TableName.META_TABLE_NAME);
+ assertTrace("APPEND");
}
@Test
public void testIncrement() throws IOException {
table.increment(
new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1));
- assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
+ assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue1() throws IOException {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1);
- assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
+ assertTrace("INCREMENT");
}
@Test
public void testIncrementColumnValue2() throws IOException {
table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1,
Durability.SYNC_WAL);
- assertTrace(HTable.class.getSimpleName(), "increment", null, TableName.META_TABLE_NAME);
+ assertTrace("INCREMENT");
}
@Test
@@ -274,7 +316,7 @@ public void testCheckAndMutate() throws IOException {
table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0))));
- assertTrace(HTable.class.getSimpleName(), "checkAndMutate", null, TableName.META_TABLE_NAME);
+ assertTrace("CHECK_AND_MUTATE");
}
@Test
@@ -282,8 +324,7 @@ public void testCheckAndMutateList() throws IOException {
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))));
- assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null,
- TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
@@ -291,52 +332,51 @@ public void testCheckAndMutateAll() throws IOException {
table.checkAndMutate(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0))
.ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))
.build(new Delete(Bytes.toBytes(0)))));
- assertTrace(HTable.class.getSimpleName(), "checkAndMutateList", null,
- TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testMutateRow() throws Exception {
byte[] row = Bytes.toBytes(0);
table.mutateRow(RowMutations.of(Arrays.asList(new Delete(row))));
- assertTrace(HTable.class.getSimpleName(), "mutateRow", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testExistsList() throws IOException {
table.exists(Arrays.asList(new Get(Bytes.toBytes(0))));
- assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testExistsAll() throws IOException {
table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0))));
- assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testGetList() throws IOException {
table.get(Arrays.asList(new Get(Bytes.toBytes(0))));
- assertTrace(HTable.class.getSimpleName(), "getList", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testPutList() throws IOException {
table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"),
Bytes.toBytes("cq"), Bytes.toBytes("v"))));
- assertTrace(HTable.class.getSimpleName(), "putList", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testDeleteList() throws IOException {
table.delete(Lists.newArrayList(new Delete(Bytes.toBytes(0))));
- assertTrace(HTable.class.getSimpleName(), "deleteList", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
public void testBatchList() throws IOException, InterruptedException {
table.batch(Arrays.asList(new Delete(Bytes.toBytes(0))), null);
- assertTrace(HTable.class.getSimpleName(), "batch", null, TableName.META_TABLE_NAME);
+ assertTrace("BATCH");
}
@Test
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
new file mode 100644
index 000000000000..c3bf3bee59e5
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.trace.hamcrest;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasProperty;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}.
+ */
+public final class AttributesMatchers {
+
+ private AttributesMatchers() { }
+
+ public static Matcher containsEntry(
+ Matcher> keyMatcher,
+ Matcher super T> valueMatcher
+ ) {
+ return new IsAttributesContaining<>(keyMatcher, valueMatcher);
+ }
+
+ public static Matcher containsEntry(AttributeKey key, T value) {
+ return containsEntry(equalTo(key), equalTo(value));
+ }
+
+ public static Matcher containsEntry(String key, String value) {
+ return containsEntry(AttributeKey.stringKey(key), value);
+ }
+
+ private static final class IsAttributesContaining extends TypeSafeMatcher {
+ private final Matcher> keyMatcher;
+ private final Matcher super T> valueMatcher;
+
+ private IsAttributesContaining(
+ final Matcher> keyMatcher,
+ final Matcher super T> valueMatcher
+ ) {
+ this.keyMatcher = keyMatcher;
+ this.valueMatcher = valueMatcher;
+ }
+
+ @Override
+ protected boolean matchesSafely(Attributes item) {
+ return item.asMap().entrySet().stream().anyMatch(e -> allOf(
+ hasProperty("key", keyMatcher),
+ hasProperty("value", valueMatcher))
+ .matches(e));
+ }
+
+ @Override
+ public void describeMismatchSafely(Attributes item, Description mismatchDescription) {
+ mismatchDescription
+ .appendText("Attributes was ")
+ .appendValueList("[", ", ", "]", item.asMap().entrySet());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("Attributes containing [")
+ .appendDescriptionOf(keyMatcher)
+ .appendText("->")
+ .appendDescriptionOf(valueMatcher)
+ .appendText("]");
+ }
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java
new file mode 100644
index 000000000000..2839e7c597c7
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.trace.hamcrest;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.data.StatusData;
+import org.hamcrest.Description;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Helper methods for matching against instances of {@link SpanData}.
+ */
+public final class SpanDataMatchers {
+
+ private SpanDataMatchers() { }
+
+ public static Matcher hasAttributes(Matcher matcher) {
+ return new FeatureMatcher(
+ matcher, "SpanData having attributes that ", "attributes"
+ ) {
+ @Override protected Attributes featureValueOf(SpanData item) {
+ return item.getAttributes();
+ }
+ };
+ }
+
+ public static Matcher hasEnded() {
+ return new TypeSafeMatcher() {
+ @Override protected boolean matchesSafely(SpanData item) {
+ return item.hasEnded();
+ }
+ @Override public void describeTo(Description description) {
+ description.appendText("SpanData that hasEnded");
+ }
+ };
+ }
+
+ public static Matcher hasKind(SpanKind kind) {
+ return new FeatureMatcher(
+ equalTo(kind), "SpanData with kind that", "SpanKind") {
+ @Override protected SpanKind featureValueOf(SpanData item) {
+ return item.getKind();
+ }
+ };
+ }
+
+ public static Matcher hasName(String name) {
+ return hasName(equalTo(name));
+ }
+
+ public static Matcher hasName(Matcher matcher) {
+ return new FeatureMatcher(matcher, "SpanKind with a name that", "name") {
+ @Override protected String featureValueOf(SpanData item) {
+ return item.getName();
+ }
+ };
+ }
+
+ public static Matcher hasStatusWithCode(StatusCode statusCode) {
+ final Matcher matcher = is(equalTo(statusCode));
+ return new TypeSafeMatcher() {
+ @Override protected boolean matchesSafely(SpanData item) {
+ final StatusData statusData = item.getStatus();
+ return statusData != null
+ && statusData.getStatusCode() != null
+ && matcher.matches(statusData.getStatusCode());
+ }
+ @Override public void describeTo(Description description) {
+ description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
+ }
+ };
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
index 1eb5d820d998..90c3c858a706 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
@@ -28,7 +28,9 @@
*/
@InterfaceAudience.Private
public final class HBaseSemanticAttributes {
+ public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME;
public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
+ public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION;
public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table");
public static final AttributeKey> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
@@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes {
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
+ /**
+ * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of
+ * {@code org.apache.hadoop.hbase.client.Operation}, as well as
+ * {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple
+ * operations.
+ */
+ public enum Operation {
+ APPEND,
+ BATCH,
+ CHECK_AND_MUTATE,
+ COPROC_EXEC,
+ DELETE,
+ GET,
+ INCREMENT,
+ PUT,
+ SCAN,
+ }
+
private HBaseSemanticAttributes() { }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index 882eed4dab09..1c428ae5608d 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -91,9 +91,11 @@ public static Span createClientSpan(String name) {
/**
* Trace an asynchronous operation for a table.
*/
- public static CompletableFuture tracedFuture(Supplier> action,
- String spanName, TableName tableName) {
- Span span = createTableSpan(spanName, tableName);
+ public static CompletableFuture tracedFuture(
+ Supplier> action,
+ Supplier spanSupplier
+ ) {
+ Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture future = action.get();
endSpan(future, span);
@@ -119,8 +121,10 @@ public static CompletableFuture tracedFuture(Supplier List> tracedFutures(
- Supplier>> action, String spanName, TableName tableName) {
- Span span = createTableSpan(spanName, tableName);
+ Supplier>> action,
+ Supplier spanSupplier
+ ) {
+ Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
List> futures = action.get();
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);