From 84ea8e2eecc417d59610cef7a29f1c98c84ec851 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 18 Nov 2021 09:01:30 -0800 Subject: [PATCH] HBASE-26472 Adhere to semantic conventions regarding table data operations Follows the guidance outlined in https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e2/specification/trace/semantic_conventions/database.dm * all table data operations are assumed to be of type CLIENT * populate `db.name` and `db.operation` attributes * name table data operation spans as `db.operation` `db.name`:`db.hbase.table` note: this implementation deviates from the recommended `db.name`.`db.sql.table` and instead uses HBase's native String representation of namespace:tablename. --- hbase-client/pom.xml | 5 + .../hbase/client/RawAsyncTableImpl.java | 117 ++++++++++++----- .../trace/TableOperationSpanBuilder.java | 119 ++++++++++++++++++ .../hbase/client/TestAsyncTableTracing.java | 116 ++++++++++------- .../trace/hamcrest/AttributesMatchers.java | 88 +++++++++++++ .../trace/hamcrest/SpanDataMatchers.java | 103 +++++++++++++++ .../hbase/trace/HBaseSemanticAttributes.java | 20 +++ .../apache/hadoop/hbase/trace/TraceUtil.java | 14 ++- 8 files changed, 507 insertions(+), 75 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml index a5b3cdd6e0d3..fa20300b1142 100644 --- a/hbase-client/pom.xml +++ b/hbase-client/pom.xml @@ -193,6 +193,11 @@ mockito-core test + + org.hamcrest + hamcrest-library + test + org.apache.commons commons-crypto diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 1a12677ac0e1..6a5c486b7478 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,7 @@ import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -44,9 +46,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.client.ConnectionUtils.Converter; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -220,35 +224,47 @@ private CompletableFuture get(Get get, int replicaId) { @Override public CompletableFuture get(Get get) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(get); return tracedFuture( () -> timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(), RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs, conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, conn.getConnectionMetrics()), - "AsyncTable.get", tableName); + supplier); } @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(put); return tracedFuture(() -> this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) - .call(), "AsyncTable.put", tableName); + .call(), supplier); } @Override public CompletableFuture delete(Delete delete) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(delete); return tracedFuture( () -> this. newCaller(delete, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(), - "AsyncTable.delete", tableName); + supplier); } @Override public CompletableFuture append(Append append) { checkHasFamilies(append); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(append); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -257,12 +273,15 @@ public CompletableFuture append(Append append) { controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.append", tableName); + }, supplier); } @Override public CompletableFuture increment(Increment increment) { checkHasFamilies(increment); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(increment); return tracedFuture(() -> { long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); @@ -271,7 +290,7 @@ public CompletableFuture increment(Increment increment) { controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); - }, "AsyncTable.increment", tableName); + }, supplier); } private final class CheckAndMutateBuilderImpl implements CheckAndMutateBuilder { @@ -329,6 +348,9 @@ private void preCheck() { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put, @@ -336,12 +358,15 @@ public CompletableFuture thenPut(Put put) { null, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -349,23 +374,26 @@ public CompletableFuture thenDelete(Delete delete) { null, timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { + public CompletableFuture thenMutate(RowMutations mutations) { preCheck(); - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, family, qualifier, op, value, null, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateBuilder.thenMutate", tableName); + supplier); } } @@ -397,6 +425,9 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) { @Override public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, @@ -405,11 +436,14 @@ public CompletableFuture thenPut(Put put) { filter, timeRange, p, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenPut", tableName); + supplier); } @Override public CompletableFuture thenDelete(Delete delete) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE); return tracedFuture( () -> RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete, @@ -417,22 +451,25 @@ public CompletableFuture thenDelete(Delete delete) { timeRange, d, HConstants.NO_NONCE, HConstants.NO_NONCE), (c, r) -> r.getProcessed())) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenDelete", tableName); + supplier); } @Override - public CompletableFuture thenMutate(RowMutations mutation) { - validatePutsInRowMutations(mutation, conn.connConf.getMaxKeyValueSize()); + public CompletableFuture thenMutate(RowMutations mutations) { + validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> RawAsyncTableImpl.this - . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) + . newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl.this.mutateRow(controller, loc, stub, - mutation, + mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, row, null, null, null, null, filter, timeRange, rm, HConstants.NO_NONCE, HConstants.NO_NONCE), CheckAndMutateResult::isSuccess)) .call(), - "AsyncTable.CheckAndMutateWithFilterBuilder.thenMutate", tableName); + supplier); } } @@ -443,6 +480,9 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) @Override public CompletableFuture checkAndMutate(CheckAndMutate checkAndMutate) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(checkAndMutate); return tracedFuture(() -> { if (checkAndMutate.getAction() instanceof Put || checkAndMutate.getAction() instanceof Delete || @@ -488,16 +528,19 @@ public CompletableFuture checkAndMutate(CheckAndMutate che "CheckAndMutate doesn't support " + checkAndMutate.getAction().getClass().getName())); return future; } - }, "AsyncTable.checkAndMutate", tableName); + }, supplier); } @Override public List> checkAndMutate(List checkAndMutates) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFutures( () -> batch(checkAndMutates, rpcTimeoutNs).stream() .map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()), - "AsyncTable.checkAndMutateList", tableName); + supplier); } // We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse, @@ -548,6 +591,9 @@ public CompletableFuture mutateRow(RowMutations mutations) { validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize()); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); return tracedFuture( () -> this . newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs) @@ -555,7 +601,7 @@ public CompletableFuture mutateRow(RowMutations mutations) { mutations, (rn, rm) -> RequestConverter.buildMultiRequest(rn, rm, nonceGroup, nonce), resp -> resp)) .call(), - "AsyncTable.mutateRow", tableName); + supplier); } private Scan setDefaultScanConfig(Scan scan) { @@ -591,6 +637,9 @@ public ResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(scan); return tracedFuture(() -> { CompletableFuture> future = new CompletableFuture<>(); List scanResults = new ArrayList<>(); @@ -612,27 +661,39 @@ public void onComplete() { } }); return future; - }, "AsyncTable.scanAll", tableName); + }, supplier); } @Override public List> get(List gets) { - return tracedFutures(() -> batch(gets, readRpcTimeoutNs), "AsyncTable.getList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier); } @Override public List> put(List puts) { - return tracedFutures(() -> voidMutate(puts), "AsyncTable.putList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> voidMutate(puts), supplier); } @Override public List> delete(List deletes) { - return tracedFutures(() -> voidMutate(deletes), "AsyncTable.deleteList", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> voidMutate(deletes), supplier); } @Override public List> batch(List actions) { - return tracedFutures(() -> batch(actions, rpcTimeoutNs), "AsyncTable.batch", tableName); + final Supplier supplier = new TableOperationSpanBuilder() + .setTableName(tableName) + .setOperation(HBaseSemanticAttributes.Operation.BATCH); + return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier); } private List> voidMutate(List actions) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java new file mode 100644 index 000000000000..fd651a85b26b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client.trace; + +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; +import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionCoprocessorServiceExec; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.Operation; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Construct {@link io.opentelemetry.api.trace.Span} instances originating from + * "table operations" -- the verbs in our public API that interact with data in tables. + */ +@InterfaceAudience.Private +public class TableOperationSpanBuilder implements Supplier { + + // n.b. The results of this class are tested implicitly by way of the likes of + // `TestAsyncTableTracing` and friends. + + private static final String unknown = "UNKNOWN"; + + private TableName tableName; + private final Map, Object> attributes = new HashMap<>(); + + @Override public Span get() { + return build(); + } + + public TableOperationSpanBuilder setOperation(final Scan scan) { + return setOperation(valueFrom(scan)); + } + + public TableOperationSpanBuilder setOperation(final Row row) { + return setOperation(valueFrom(row)); + } + + public TableOperationSpanBuilder setOperation(final Operation operation) { + attributes.put(DB_OPERATION, operation.name()); + return this; + } + + public TableOperationSpanBuilder setTableName(final TableName tableName) { + this.tableName = tableName; + attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString()); + attributes.put(DB_NAME, tableName.getNamespaceAsString()); + attributes.put(TABLE_KEY, tableName.getNameAsString()); + return this; + } + + @SuppressWarnings("unchecked") + public Span build() { + final String name = attributes.getOrDefault(DB_OPERATION, unknown) + + " " + + (tableName != null ? tableName.getNameWithNamespaceInclAsString() : unknown); + final SpanBuilder builder = TraceUtil.getGlobalTracer() + .spanBuilder(name) + // TODO: what about clients embedded in Master/RegionServer/Gateways/&c? + .setSpanKind(SpanKind.CLIENT); + attributes.forEach((k, v) -> builder.setAttribute((AttributeKey) k, v)); + return builder.startSpan(); + } + + private static Operation valueFrom(final Scan scan) { + if (scan == null) { return null; } + return Operation.SCAN; + } + + private static Operation valueFrom(final Row row) { + if (row == null) { return null; } + if (row instanceof Append) { return Operation.APPEND; } + if (row instanceof CheckAndMutate) { return Operation.CHECK_AND_MUTATE; } + if (row instanceof Delete) { return Operation.DELETE; } + if (row instanceof Get) { return Operation.GET; } + if (row instanceof Increment) { return Operation.INCREMENT; } + if (row instanceof Put) { return Operation.PUT; } + if (row instanceof RegionCoprocessorServiceExec) { + return Operation.COPROC_EXEC; + } + if (row instanceof RowMutations) { return Operation.BATCH; } + return null; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index 4e38fca3efc9..5ad39c48623a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,15 +17,23 @@ */ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY; -import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - +import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -43,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MatcherPredicate; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; @@ -50,8 +59,9 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matcher; +import org.hamcrest.core.IsAnything; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -60,10 +70,8 @@ import org.junit.experimental.categories.Category; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; @@ -220,49 +228,73 @@ public void tearDown() throws IOException { Closeables.close(conn, true); } - private void assertTrace(String methodName) { - Waiter.waitFor(CONF, 1000, - () -> traceRule.getSpans().stream() - .anyMatch(span -> span.getName().equals("AsyncTable." + methodName) && - span.getKind() == SpanKind.INTERNAL && span.hasEnded())); - SpanData data = traceRule.getSpans().stream() - .filter(s -> s.getName().equals("AsyncTable." + methodName)).findFirst().get(); - assertEquals(StatusCode.OK, data.getStatus().getStatusCode()); - TableName tableName = table.getName(); - assertEquals(tableName.getNamespaceAsString(), data.getAttributes().get(NAMESPACE_KEY)); - assertEquals(tableName.getNameAsString(), data.getAttributes().get(TABLE_KEY)); + /** + * All {@link Span}s generated from table data access operations over {@code tableName} should + * include these attributes. + */ + private static Matcher buildBaseAttributesMatcher(TableName tableName) { + return hasAttributes(allOf( + containsEntry("db.name", tableName.getNamespaceAsString()), + containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()), + containsEntry("db.hbase.table", tableName.getNameAsString()))); + } + + private void assertTrace(String tableOperation) { + assertTrace(tableOperation, new IsAnything<>()); + } + + private void assertTrace(String tableOperation, Matcher matcher) { + final TableName tableName = table.getName(); + final Matcher spanLocator = allOf( + hasName(containsString(tableOperation)), hasEnded()); + final String expectedName = tableOperation + " " + tableName.getNameWithNamespaceInclAsString(); + + Waiter.waitFor(CONF, 1000, new MatcherPredicate<>( + "waiting for span to emit", + () -> traceRule.getSpans(), hasItem(spanLocator))); + SpanData data = traceRule.getSpans() + .stream() + .filter(spanLocator::matches) + .findFirst() + .orElseThrow(AssertionError::new); + assertThat(data, allOf( + hasName(expectedName), + hasKind(SpanKind.CLIENT), + hasStatusWithCode(StatusCode.OK), + buildBaseAttributesMatcher(tableName), + matcher)); } @Test public void testExists() { table.exists(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testGet() { table.get(new Get(Bytes.toBytes(0))).join(); - assertTrace("get"); + assertTrace("GET"); } @Test public void testPut() { table.put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("put"); + assertTrace("PUT"); } @Test public void testDelete() { table.delete(new Delete(Bytes.toBytes(0))).join(); - assertTrace("delete"); + assertTrace("DELETE"); } @Test public void testAppend() { table.append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); - assertTrace("append"); + assertTrace("APPEND"); } @Test @@ -271,21 +303,21 @@ public void testIncrement() { .increment( new Increment(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1)) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue1() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1) .join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test public void testIncrementColumnValue2() { table.incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1, Durability.ASYNC_WAL).join(); - assertTrace("increment"); + assertTrace("INCREMENT"); } @Test @@ -293,7 +325,7 @@ public void testCheckAndMutate() { table.checkAndMutate(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("checkAndMutate"); + assertTrace("CHECK_AND_MUTATE"); } @Test @@ -303,7 +335,7 @@ public void testCheckAndMutateList() { .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).toArray(new CompletableFuture[0])) .join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); } @Test @@ -311,19 +343,19 @@ public void testCheckAndMutateAll() { table.checkAndMutateAll(Arrays.asList(CheckAndMutate.newBuilder(Bytes.toBytes(0)) .ifEquals(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")) .build(new Delete(Bytes.toBytes(0))))).join(); - assertTrace("checkAndMutateList"); + assertTrace("BATCH"); } @Test public void testMutateRow() throws IOException { table.mutateRow(new RowMutations(Bytes.toBytes(0)).add(new Delete(Bytes.toBytes(0)))); - assertTrace("mutateRow"); + assertTrace("BATCH"); } @Test - public void testScanAll() throws IOException { + public void testScanAll() { table.scanAll(new Scan().setCaching(1).setMaxResultSize(1).setLimit(1)).join(); - assertTrace("scanAll"); + assertTrace("SCAN"); } @Test @@ -332,13 +364,13 @@ public void testExistsList() { .allOf( table.exists(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testExistsAll() { table.existsAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -346,13 +378,13 @@ public void testGetList() { CompletableFuture .allOf(table.get(Arrays.asList(new Get(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test public void testGetAll() { table.getAll(Arrays.asList(new Get(Bytes.toBytes(0)))).join(); - assertTrace("getList"); + assertTrace("BATCH"); } @Test @@ -361,14 +393,14 @@ public void testPutList() { .allOf(table.put(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).toArray(new CompletableFuture[0])) .join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test public void testPutAll() { table.putAll(Arrays.asList(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")))).join(); - assertTrace("putList"); + assertTrace("BATCH"); } @Test @@ -377,13 +409,13 @@ public void testDeleteList() { .allOf( table.delete(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test public void testDeleteAll() { table.deleteAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("deleteList"); + assertTrace("BATCH"); } @Test @@ -392,13 +424,13 @@ public void testBatch() { .allOf( table.batch(Arrays.asList(new Delete(Bytes.toBytes(0)))).toArray(new CompletableFuture[0])) .join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test public void testBatchAll() { table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); - assertTrace("batch"); + assertTrace("BATCH"); } @Test diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java new file mode 100644 index 000000000000..c3bf3bee59e5 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link io.opentelemetry.api.common.Attributes}. + */ +public final class AttributesMatchers { + + private AttributesMatchers() { } + + public static Matcher containsEntry( + Matcher> keyMatcher, + Matcher valueMatcher + ) { + return new IsAttributesContaining<>(keyMatcher, valueMatcher); + } + + public static Matcher containsEntry(AttributeKey key, T value) { + return containsEntry(equalTo(key), equalTo(value)); + } + + public static Matcher containsEntry(String key, String value) { + return containsEntry(AttributeKey.stringKey(key), value); + } + + private static final class IsAttributesContaining extends TypeSafeMatcher { + private final Matcher> keyMatcher; + private final Matcher valueMatcher; + + private IsAttributesContaining( + final Matcher> keyMatcher, + final Matcher valueMatcher + ) { + this.keyMatcher = keyMatcher; + this.valueMatcher = valueMatcher; + } + + @Override + protected boolean matchesSafely(Attributes item) { + return item.asMap().entrySet().stream().anyMatch(e -> allOf( + hasProperty("key", keyMatcher), + hasProperty("value", valueMatcher)) + .matches(e)); + } + + @Override + public void describeMismatchSafely(Attributes item, Description mismatchDescription) { + mismatchDescription + .appendText("Attributes was ") + .appendValueList("[", ", ", "]", item.asMap().entrySet()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("Attributes containing [") + .appendDescriptionOf(keyMatcher) + .appendText("->") + .appendDescriptionOf(valueMatcher) + .appendText("]"); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java new file mode 100644 index 000000000000..01142850f43f --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace.hamcrest; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.util.Objects; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Helper methods for matching against instances of {@link SpanData}. + */ +public final class SpanDataMatchers { + + private SpanDataMatchers() { } + + public static Matcher hasAttributes(Matcher matcher) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final Attributes attributes = item.getAttributes(); + return attributes != null && matcher.matches(attributes); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData having ").appendDescriptionOf(matcher); + } + }; + } + + public static Matcher hasEnded() { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + return item.hasEnded(); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData that hasEnded"); + } + }; + } + + public static Matcher hasKind(SpanKind kind) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + return Objects.equals(item.getKind(), kind); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with kind of ").appendValue(kind); + } + }; + } + + public static Matcher hasName(String name) { + return hasName(equalTo(name)); + } + + public static Matcher hasName(Matcher matcher) { + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final String name = item.getName(); + return name != null && matcher.matches(name); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with a name that ").appendDescriptionOf(matcher); + } + }; + } + + public static Matcher hasStatusWithCode(StatusCode statusCode) { + final Matcher matcher = is(equalTo(statusCode)); + return new TypeSafeMatcher() { + @Override protected boolean matchesSafely(SpanData item) { + final StatusData statusData = item.getStatus(); + return statusData != null + && statusData.getStatusCode() != null + && matcher.matches(statusData.getStatusCode()); + } + @Override public void describeTo(Description description) { + description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher); + } + }; + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java index 1eb5d820d998..90c3c858a706 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java @@ -28,7 +28,9 @@ */ @InterfaceAudience.Private public final class HBaseSemanticAttributes { + public static final AttributeKey DB_NAME = SemanticAttributes.DB_NAME; public static final AttributeKey NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE; + public static final AttributeKey DB_OPERATION = SemanticAttributes.DB_OPERATION; public static final AttributeKey TABLE_KEY = AttributeKey.stringKey("db.hbase.table"); public static final AttributeKey> REGION_NAMES_KEY = AttributeKey.stringArrayKey("db.hbase.regions"); @@ -44,5 +46,23 @@ public final class HBaseSemanticAttributes { AttributeKey.booleanKey("db.hbase.rowlock.readlock"); public static final AttributeKey WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl"); + /** + * These are values used with {@link #DB_OPERATION}. They correspond with the implementations of + * {@code org.apache.hadoop.hbase.client.Operation}, as well as + * {@code org.apache.hadoop.hbase.client.CheckAndMutate}, and "MULTI", meaning a batch of multiple + * operations. + */ + public enum Operation { + APPEND, + BATCH, + CHECK_AND_MUTATE, + COPROC_EXEC, + DELETE, + GET, + INCREMENT, + PUT, + SCAN, + } + private HBaseSemanticAttributes() { } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index e360705916dd..706d4891c618 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -91,9 +91,11 @@ public static Span createClientSpan(String name) { /** * Trace an asynchronous operation for a table. */ - public static CompletableFuture tracedFuture(Supplier> action, - String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + public static CompletableFuture tracedFuture( + Supplier> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { CompletableFuture future = action.get(); endSpan(future, span); @@ -119,8 +121,10 @@ public static CompletableFuture tracedFuture(Supplier List> tracedFutures( - Supplier>> action, String spanName, TableName tableName) { - Span span = createTableSpan(spanName, tableName); + Supplier>> action, + Supplier spanSupplier + ) { + Span span = spanSupplier.get(); try (Scope scope = span.makeCurrent()) { List> futures = action.get(); endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);