From 893f6407a0144044df5e6200c826ab8ac2d38898 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Thu, 20 Jan 2022 12:39:20 -0800 Subject: [PATCH] HBASE-26545 Implement tracing of scan * on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations. * the span of the `SCAN` table operation should have children representing all the RPC calls involved in servicing the scan. * when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted from the callback methods should also be tied to the span that represents the `SCAN` table operation. This is easily done because these callbacks are executed on the RPC thread. * when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the callback methods should be also be tied to the span that represents the `SCAN` table operation. This accomplished by carefully passing the span instance around after it is created. --- .../hbase/client/AsyncClientScanner.java | 144 +++++++++---- ...syncScanSingleRegionRpcRetryingCaller.java | 4 +- .../hadoop/hbase/client/AsyncTableImpl.java | 28 ++- .../hbase/client/AsyncTableResultScanner.java | 13 +- .../hbase/client/RawAsyncTableImpl.java | 38 ++-- .../hbase/client/TestAsyncTableTracing.java | 53 ++++- .../client/trace/StringTraceRenderer.java | 139 +++++++++++++ .../trace/hamcrest/SpanDataMatchers.java | 39 ++++ .../hbase/StartTestingClusterOption.java | 5 + .../client/AbstractTestAsyncTableScan.java | 189 +++++++++++++----- .../client/LimitedScanResultConsumer.java | 35 ++++ .../client/SimpleScanResultConsumer.java | 58 +----- .../client/SimpleScanResultConsumerImpl.java | 75 +++++++ .../hbase/client/TestAsyncTableScan.java | 174 +++++++++++----- .../hbase/client/TestAsyncTableScanAll.java | 80 ++++++++ .../client/TestAsyncTableScanMetrics.java | 2 +- .../hbase/client/TestAsyncTableScanner.java | 82 +++++++- .../hbase/client/TestRawAsyncTableScan.java | 132 +++++++++++- .../TracedAdvancedScanResultConsumer.java | 65 ++++++ .../client/TracedScanResultConsumer.java | 68 +++++++ .../hbase/trace/OpenTelemetryClassRule.java | 127 ++++++++++++ .../hbase/trace/OpenTelemetryTestRule.java | 39 ++++ pom.xml | 3 +- 23 files changed, 1359 insertions(+), 233 deletions(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 48f004c0a29c..bc804f19e4db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -35,11 +37,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; @@ -85,6 +87,17 @@ class AsyncClientScanner { private final ScanResultCache resultCache; + /* + * The `span` instance is accessed concurrently by several threads, but we use only basic + * synchronization. The class claims to be `@ThreadSafe` so we rely on the implementation to + * correctly handle concurrent use. The instance itself is initialized in the `start` method, + * so we cannot make it `final`. Because the instance is created before any consuming runnable + * is submitted to a worker pool, it should be enough to mark it as `volatile`. To avoid over- + * paying the price of the memory barrier, where a consumer makes multiple uses of the `span` + * instance, we use a local final non-volatile reference. + */ + private volatile Span span = null; + public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { @@ -140,26 +153,37 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In private CompletableFuture callOpenScanner(HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { - boolean isRegionServerRemote = isRemote(loc.getHostname()); - incRPCCallsMetrics(scanMetrics, isRegionServerRemote); - if (openScannerTries.getAndIncrement() > 1) { - incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); - } - CompletableFuture future = new CompletableFuture<>(); - try { - ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, - scan.getCaching(), false); - stub.scan(controller, request, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - return; - } - future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); - }); - } catch (IOException e) { - future.completeExceptionally(e); + final Span localSpan = span; + try (Scope ignored = localSpan.makeCurrent()) { + boolean isRegionServerRemote = isRemote(loc.getHostname()); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); + if (openScannerTries.getAndIncrement() > 1) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + } + CompletableFuture future = new CompletableFuture<>(); + try { + ScanRequest request = RequestConverter.buildScanRequest( + loc.getRegion().getRegionName(), scan, scan.getCaching(), false); + stub.scan(controller, request, resp -> { + try (Scope ignored1 = localSpan.makeCurrent()) { + if (controller.failed()) { + final IOException e = controller.getFailed(); + future.completeExceptionally(e); + TraceUtil.setError(localSpan, e); + localSpan.end(); + return; + } + future.complete(new OpenScannerResponse( + loc, isRegionServerRemote, stub, controller, resp)); + } + }); + } catch (IOException e) { + TraceUtil.setError(localSpan, e); + localSpan.end(); + future.completeExceptionally(e); + } + return future; } - return future; } private void startScan(OpenScannerResponse resp) { @@ -173,26 +197,41 @@ private void startScan(OpenScannerResponse resp) { .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), (hasMore, error) -> { - if (error != null) { - consumer.onError(error); - return; - } - if (hasMore) { - openScanner(); - } else { - consumer.onComplete(); + final Span localSpan = span; + try (Scope ignored = localSpan.makeCurrent()) { + if (error != null) { + try { + consumer.onError(error); + return; + } finally { + TraceUtil.setError(localSpan, error); + localSpan.end(); + } + } + if (hasMore) { + openScanner(); + } else { + try { + consumer.onComplete(); + } finally { + localSpan.setStatus(StatusCode.OK); + localSpan.end(); + } + } } }); } private CompletableFuture openScanner(int replicaId) { - return conn.callerFactory. single().table(tableName) - .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) - .priority(scan.getPriority()) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); + try (Scope ignored = span.makeCurrent()) { + return conn.callerFactory. single().table(tableName) + .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .priority(scan.getPriority()) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); + } } private long getPrimaryTimeoutNs() { @@ -206,15 +245,34 @@ private void openScanner() { addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, conn.getConnectionMetrics()), (resp, error) -> { - if (error != null) { - consumer.onError(error); - return; + final Span localSpan = span; + try (Scope ignored = localSpan.makeCurrent()) { + if (error != null) { + try { + consumer.onError(error); + return; + } finally { + TraceUtil.setError(localSpan, error); + localSpan.end(); + } + } + startScan(resp); } - startScan(resp); }); } public void start() { - openScanner(); + final Span localSpan = new TableOperationSpanBuilder(conn) + .setTableName(tableName) + .setOperation(scan) + .build(); + if (consumer instanceof AsyncTableResultScanner) { + AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; + scanner.setSpan(localSpan); + } + span = localSpan; + try (Scope ignored = localSpan.makeCurrent()) { + openScanner(); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 7f19180a0ab2..b1dc48494d26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -28,6 +28,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; +import io.opentelemetry.context.Context; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -573,7 +574,8 @@ private void call() { resetController(controller, callTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); - stub.scan(controller, req, resp -> onComplete(controller, resp)); + Context context = Context.current(); + stub.scan(controller, req, resp -> context.wrap(() -> onComplete(controller, resp)).run()); } private void next() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 96c650f69493..bcc8f4c5f780 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; - +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -32,7 +34,6 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; /** @@ -232,22 +233,29 @@ public ResultScanner getScanner(Scan scan) { } private void scan0(Scan scan, ScanResultConsumer consumer) { - try (ResultScanner scanner = getScanner(scan)) { - consumer.onScanMetricsCreated(scanner.getScanMetrics()); - for (Result result; (result = scanner.next()) != null;) { - if (!consumer.onNext(result)) { - break; + Span span = null; + try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { + span = scanner.getSpan(); + try (Scope ignored = span.makeCurrent()) { + consumer.onScanMetricsCreated(scanner.getScanMetrics()); + for (Result result; (result = scanner.next()) != null; ) { + if (!consumer.onNext(result)) { + break; + } } + consumer.onComplete(); } - consumer.onComplete(); } catch (IOException e) { - consumer.onError(e); + try (Scope ignored = span.makeCurrent()) { + consumer.onError(e); + } } } @Override public void scan(Scan scan, ScanResultConsumer consumer) { - pool.execute(() -> scan0(scan, consumer)); + final Context context = Context.current(); + pool.execute(context.wrap(() -> scan0(scan, consumer))); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index e9b15f999a93..6462cd093f85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; - +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayDeque; @@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum private ScanResumer resumer; + // Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`. + private Span span = null; + public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) { this.tableName = tableName; this.maxCacheSize = maxCacheSize; @@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) { resumer = controller.suspend(); } + Span getSpan() { + return span; + } + + void setSpan(final Span span) { + this.span = span; + } + @Override public synchronized void onNext(Result[] results, ScanController controller) { assert results.length > 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index a144550022b3..927d4e5e5452 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { - final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(scan); - return tracedFuture(() -> { - CompletableFuture> future = new CompletableFuture<>(); - List scanResults = new ArrayList<>(); - scan(scan, new AdvancedScanResultConsumer() { + CompletableFuture> future = new CompletableFuture<>(); + List scanResults = new ArrayList<>(); + scan(scan, new AdvancedScanResultConsumer() { - @Override - public void onNext(Result[] results, ScanController controller) { - scanResults.addAll(Arrays.asList(results)); - } + @Override + public void onNext(Result[] results, ScanController controller) { + scanResults.addAll(Arrays.asList(results)); + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(scanResults); - } - }); - return future; - }, supplier); + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index ae3bc3a00319..69cd77668dc7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -28,6 +28,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.fail; @@ -44,8 +46,10 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -106,7 +110,7 @@ public class TestAsyncTableTracing { private AsyncConnectionImpl conn; - private AsyncTable table; + private AsyncTable table; @Rule public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); @@ -452,6 +456,53 @@ public void testScanAll() { assertTrace("SCAN"); } + @Test + public void testScan() throws Throwable { + final CountDownLatch doneSignal = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final AtomicReference throwable = new AtomicReference<>(); + final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); + table.scan(scan, new ScanResultConsumer() { + @Override public boolean onNext(Result result) { + if (result.getRow() != null) { + count.incrementAndGet(); + } + return true; + } + + @Override public void onError(Throwable error) { + throwable.set(error); + doneSignal.countDown(); + } + + @Override public void onComplete() { + doneSignal.countDown(); + } + }); + doneSignal.await(); + if (throwable.get() != null) { + throw throwable.get(); + } + assertThat("user code did not run. check test setup.", count.get(), greaterThan(0)); + assertTrace("SCAN"); + } + + @Test + public void testGetScanner() { + final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); + try (ResultScanner scanner = table.getScanner(scan)) { + int count = 0; + for (Result result : scanner) { + if (result.getRow() != null) { + count++; + } + } + // do something with it. + assertThat(count, greaterThanOrEqualTo(0)); + } + assertTrace("SCAN"); + } + @Test public void testExistsList() { CompletableFuture diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java new file mode 100644 index 000000000000..2c7061259f90 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/StringTraceRenderer.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.trace; + +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Rudimentary tool for visualizing a hierarchy of spans. Given a collection of spans, indexes + * them from parents to children and prints them out one per line, indented. + */ +@InterfaceAudience.Private +public class StringTraceRenderer { + private static final Logger logger = LoggerFactory.getLogger(StringTraceRenderer.class); + + private final List graphs; + + public StringTraceRenderer(final Collection spans) { + final Map spansById = indexSpansById(spans); + populateChildren(spansById); + graphs = findRoots(spansById); + } + + private static Map indexSpansById(final Collection spans) { + final Map spansById = new HashMap<>(spans.size()); + spans.forEach(span -> spansById.put(span.getSpanId(), new Node(span))); + return spansById; + } + + private static void populateChildren(final Map spansById) { + spansById.forEach((spanId, node) -> { + final SpanData spanData = node.spanData; + final String parentSpanId = spanData.getParentSpanId(); + if (Objects.equals(parentSpanId, SpanId.getInvalid())) { + return; + } + final Node parentNode = spansById.get(parentSpanId); + if (parentNode == null) { + logger.warn("Span {} has parent {} that is not found in index, {}", spanId, parentSpanId, + spanData); + return; + } + parentNode.children.put(spanId, node); + }); + } + + private static List findRoots(final Map spansById) { + return spansById.values() + .stream() + .filter(node -> Objects.equals(node.spanData.getParentSpanId(), SpanId.getInvalid())) + .collect(Collectors.toList()); + } + + public void render(final Consumer writer) { + for (ListIterator iter = graphs.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node node = iter.next(); + render(writer, node, 0, idx == 0); + } + } + + private static void render( + final Consumer writer, + final Node node, + final int indent, + final boolean isFirst + ) { + writer.accept(render(node.spanData, indent, isFirst)); + final List children = new ArrayList<>(node.children.values()); + for (ListIterator iter = children.listIterator(); iter.hasNext(); ) { + final int idx = iter.nextIndex(); + final Node child = iter.next(); + render(writer, child, indent + 2, idx == 0); + } + } + + private static String render( + final SpanData spanData, + final int indent, + final boolean isFirst + ) { + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < indent; i++) { + sb.append(' '); + } + + return sb.append(isFirst ? "└─ " : "├─ ") + .append(render(spanData)) + .toString(); + } + + private static String render(final SpanData spanData) { + return new ToStringBuilder(spanData, ToStringStyle.NO_CLASS_NAME_STYLE) + .append("spanId", spanData.getSpanId()) + .append("name", spanData.getName()) + .append("hasEnded", spanData.hasEnded()) + .toString(); + } + + private static class Node { + final SpanData spanData; + final LinkedHashMap children; + + Node(final SpanData spanData) { + this.spanData = spanData; + this.children = new LinkedHashMap<>(); + } + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index a9473dae5597..026deb0afe45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client.trace.hamcrest; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import io.opentelemetry.api.common.Attributes; @@ -25,7 +26,9 @@ import io.opentelemetry.sdk.trace.data.EventData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.time.Duration; +import java.util.Objects; import org.hamcrest.Description; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; @@ -78,6 +81,24 @@ public static Matcher hasEvents(Matcher> m }; } + public static Matcher hasExceptionWithType(Matcher matcher) { + return hasException(containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), matcher)); + } + + public static Matcher hasException(Matcher matcher) { + return new FeatureMatcher(matcher, + "SpanData having Exception with Attributes that", "exception attributes") { + @Override protected Attributes featureValueOf(SpanData actual) { + return actual.getEvents() + .stream() + .filter(e -> Objects.equals(SemanticAttributes.EXCEPTION_EVENT_NAME, e.getName())) + .map(EventData::getAttributes) + .findFirst() + .orElse(null); + } + }; + } + public static Matcher hasKind(SpanKind kind) { return new FeatureMatcher( equalTo(kind), "SpanData with kind that", "SpanKind") { @@ -99,6 +120,24 @@ public static Matcher hasName(Matcher matcher) { }; } + public static Matcher hasParentSpanId(String parentSpanId) { + return hasParentSpanId(equalTo(parentSpanId)); + } + + public static Matcher hasParentSpanId(SpanData parent) { + return hasParentSpanId(parent.getSpanId()); + } + + public static Matcher hasParentSpanId(Matcher matcher) { + return new FeatureMatcher(matcher, "SpanKind with a parentSpanId that", + "parentSpanId" + ) { + @Override protected String featureValueOf(SpanData item) { + return item.getParentSpanId(); + } + }; + } + public static Matcher hasStatusWithCode(StatusCode statusCode) { final Matcher matcher = is(equalTo(statusCode)); return new TypeSafeMatcher() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java index afd74bb9314f..30c54244dfc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java @@ -264,6 +264,11 @@ public Builder numZkServers(int numZkServers) { return this; } + public Builder numWorkers(int numWorkers) { + return numDataNodes(numWorkers) + .numRegionServers(numWorkers); + } + public Builder createRootDir(boolean createRootDir) { this.createRootDir = createRootDir; return this; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 14456eca1b13..5056ef25e4f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -17,29 +17,95 @@ */ package org.apache.hadoop.hbase.client; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ConnectionRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.MiniClusterRule; +import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; +import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; public abstract class AbstractTestAsyncTableScan { - protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); + protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder() + .setMiniClusterOption(StartTestingClusterOption.builder() + .numWorkers(3) + .build()) + .build(); + + protected static final ConnectionRule connectionRule = + new ConnectionRule(miniClusterRule::createConnection); + + private static final class Setup extends ExternalResource { + @Override + protected void before() throws Throwable { + final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility(); + final AsyncConnection conn = connectionRule.getConnection(); + + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); + testingUtil.waitTableAvailable(TABLE_NAME); + conn.getTable(TABLE_NAME) + .putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)) + .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) + .collect(Collectors.toList())) + .get(); + } + } + + @ClassRule + public static final TestRule classRule = RuleChain.outerRule(otelClassRule) + .around(miniClusterRule) + .around(connectionRule) + .around(new Setup()); + + @Rule + public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); + + @Rule + public final TestName testName = new TestName(); protected static TableName TABLE_NAME = TableName.valueOf("async"); @@ -51,53 +117,29 @@ public abstract class AbstractTestAsyncTableScan { protected static int COUNT = 1000; - protected static AsyncConnection ASYNC_CONN; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - byte[][] splitKeys = new byte[8][]; - for (int i = 111; i < 999; i += 111) { - splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) - .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) - .collect(Collectors.toList())).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - ASYNC_CONN.close(); - TEST_UTIL.shutdownMiniCluster(); - } - - protected static Scan createNormalScan() { + private static Scan createNormalScan() { return new Scan(); } - protected static Scan createBatchScan() { + private static Scan createBatchScan() { return new Scan().setBatch(1); } // set a small result size for testing flow control - protected static Scan createSmallResultSizeScan() { + private static Scan createSmallResultSizeScan() { return new Scan().setMaxResultSize(1); } - protected static Scan createBatchSmallResultSizeScan() { + private static Scan createBatchSmallResultSizeScan() { return new Scan().setBatch(1).setMaxResultSize(1); } - protected static AsyncTable getRawTable() { - return ASYNC_CONN.getTable(TABLE_NAME); + private static AsyncTable getRawTable() { + return connectionRule.getConnection().getTable(TABLE_NAME); } - protected static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + private static AsyncTable getTable() { + return connectionRule.getConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); } private static List>> getScanCreator() { @@ -131,8 +173,18 @@ protected static List getTableAndScanCreatorParams() { protected abstract List doScan(Scan scan, int closeAfter) throws Exception; + /** + * Used by implementation classes to assert the correctness of spans produced under test. + */ + protected abstract void assertTraceContinuity(); + + /** + * Used by implementation classes to assert the correctness of spans having errors. + */ + protected abstract void assertTraceError(final Matcher exceptionTypeNameMatcher); + protected final List convertFromBatchResult(List results) { - assertTrue(results.size() % 2 == 0); + assertEquals(0, results.size() % 2); return IntStream.range(0, results.size() / 2).mapToObj(i -> { try { return Result @@ -143,16 +195,25 @@ protected final List convertFromBatchResult(List results) { }).collect(Collectors.toList()); } + protected static void waitForSpan(final Matcher parentSpanMatcher) { + final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration(); + Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( + "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher))); + } + @Test public void testScanAll() throws Exception { List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side - TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .forEach( - rs -> assertEquals( - "The scanner count of " + rs.getServerName() + " is " + - rs.getRSRpcServices().getScannersCount(), - 0, rs.getRSRpcServices().getScannersCount())); + miniClusterRule.getTestingUtility() + .getHBaseCluster() + .getRegionServerThreads() + .stream() + .map(JVMClusterUtil.RegionServerThread::getRegionServer) + .forEach(rs -> assertEquals( + "The scanner count of " + rs.getServerName() + " is " + + rs.getRSRpcServices().getScannersCount(), + 0, rs.getRSRpcServices().getScannersCount())); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> { Result result = results.get(i); @@ -169,37 +230,55 @@ private void assertResultEquals(Result result, int i) { @Test public void testReversedScanAll() throws Exception { - List results = doScan(createScan().setReversed(true), -1); + List results = TraceUtil.trace( + () -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); + assertTraceContinuity(); } @Test public void testScanNoStopKey() throws Exception { int start = 345; - List results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1); + List results = TraceUtil.trace(() -> + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), + testName.getMethodName()); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); + assertTraceContinuity(); } @Test public void testReverseScanNoStopKey() throws Exception { int start = 765; - List results = doScan( - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1); + final Scan scan = createScan() + .withStartRow(Bytes.toBytes(String.format("%03d", start))) + .setReversed(true); + List results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); + assertTraceContinuity(); } @Test - public void testScanWrongColumnFamily() throws Exception { - try { - doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1); - } catch (Exception e) { - assertTrue(e instanceof NoSuchColumnFamilyException || - e.getCause() instanceof NoSuchColumnFamilyException); + public void testScanWrongColumnFamily() { + final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace( + () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), + testName.getMethodName())); + // hamcrest generic enforcement for `anyOf` is a pain; skip it + // but -- don't we always unwrap ExecutionExceptions -- bug? + if (e instanceof NoSuchColumnFamilyException) { + final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; + assertThat(ex, isA(NoSuchColumnFamilyException.class)); + } else if (e instanceof ExecutionException) { + final ExecutionException ex = (ExecutionException) e; + assertThat(ex, allOf( + isA(ExecutionException.class), + hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); + } else { + fail("Found unexpected Exception " + e); } + assertTraceError(endsWith(NoSuchColumnFamilyException.class.getName())); } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java new file mode 100644 index 000000000000..28fbc5ab28f4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Advise the scanning infrastructure to collect up to {@code limit} results. + */ +class LimitedScanResultConsumer extends SimpleScanResultConsumerImpl { + + private final int limit; + + public LimitedScanResultConsumer(int limit) { + this.limit = limit; + } + + @Override + public synchronized boolean onNext(Result result) { + return super.onNext(result) && results.size() < limit; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java index fce6773f7a6c..b019fe75420b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,59 +17,15 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; - -import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -final class SimpleScanResultConsumer implements ScanResultConsumer { - - private ScanMetrics scanMetrics; - - private final List results = new ArrayList<>(); - - private Throwable error; - - private boolean finished = false; - - @Override - public void onScanMetricsCreated(ScanMetrics scanMetrics) { - this.scanMetrics = scanMetrics; - } - - @Override - public synchronized boolean onNext(Result result) { - results.add(result); - return true; - } - - @Override - public synchronized void onError(Throwable error) { - this.error = error; - finished = true; - notifyAll(); - } - - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); - } +/** + * A simplistic {@link ScanResultConsumer} for use in tests. + */ +public interface SimpleScanResultConsumer extends ScanResultConsumer { - public synchronized List getAll() throws Exception { - while (!finished) { - wait(); - } - if (error != null) { - Throwables.propagateIfPossible(error, Exception.class); - throw new Exception(error); - } - return results; - } + List getAll() throws Exception; - public ScanMetrics getScanMetrics() { - return scanMetrics; - } + ScanMetrics getScanMetrics(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java new file mode 100644 index 000000000000..98941fece196 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +class SimpleScanResultConsumerImpl implements SimpleScanResultConsumer { + + private ScanMetrics scanMetrics; + + protected final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return true; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + @Override + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + + @Override + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index c1797f3833c3..28b04639959b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,24 +17,40 @@ */ package org.apache.hadoop.hbase.client; -import java.util.ArrayList; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScan extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -58,8 +74,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - AsyncTable table = - ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable table = connectionRule.getConnection() + .getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results; if (closeAfter > 0) { // these tests batch settings with the sample data result in each result being @@ -68,11 +84,13 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { if (scan.getBatch() > 0) { closeAfter = closeAfter * 2; } - LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter); + TracedScanResultConsumer consumer = + new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter)); table.scan(scan, consumer); results = consumer.getAll(); } else { - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + TracedScanResultConsumer consumer = + new TracedScanResultConsumer(new SimpleScanResultConsumerImpl()); table.scan(scan, consumer); results = consumer.getAll(); } @@ -82,49 +100,115 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { return results; } - private static class LimitedScanResultConsumer implements ScanResultConsumer { - - private final int limit; - - public LimitedScanResultConsumer(int limit) { - this.limit = limit; - } - - private final List results = new ArrayList<>(); - - private Throwable error; - - private boolean finished = false; - - @Override - public synchronized boolean onNext(Result result) { - results.add(result); - return results.size() < limit; + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); } - @Override - public synchronized void onError(Throwable error) { - this.error = error; - finished = true; - notifyAll(); - } + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onScanMetricsCreatedMatcher = + hasName("TracedScanResultConsumer#onScanMetricsCreated"); + assertThat(spans, hasItem(onScanMetricsCreatedMatcher)); + spans.stream() + .filter(onScanMetricsCreatedMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onScanMetricsCreatedMatcher, + hasParentSpanId(scanOperationSpanId), + hasEnded()))); + + final Matcher onNextMatcher = hasName("TracedScanResultConsumer#onNext"); + assertThat(spans, hasItem(onNextMatcher)); + spans.stream() + .filter(onNextMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onNextMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + + final Matcher onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); } - public synchronized List getAll() throws Exception { - while (!finished) { - wait(); - } - if (error != null) { - Throwables.propagateIfPossible(error, Exception.class); - throw new Exception(error); - } - return results; - } + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onErrorMatcher = hasName("TracedScanResultConsumer#onError"); + assertThat(spans, hasItem(onErrorMatcher)); + spans.stream() + .filter(onErrorMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onErrorMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index 96c2d40138ca..33460bf4dbaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -17,21 +17,39 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -72,4 +90,66 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java index 6420806d7ee8..9c7f024c99c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java @@ -123,7 +123,7 @@ private static Pair, ScanMetrics> doScanWithRawAsyncTable(Scan scan private static Pair, ScanMetrics> doScanWithAsyncTableScan(Scan scan) throws Exception { - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl(); CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index 2e990f763da0..0a0aad0f3945 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -17,23 +17,41 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -63,7 +81,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable table = connectionRule.getConnection() + .getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -84,4 +103,65 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + assertThat(spans, hasItem(allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 26c201e19865..529ad90ae871 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -17,22 +17,41 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -56,8 +75,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); - ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); + TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer(); + connectionRule.getConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -76,4 +95,113 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + // RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug? + final Matcher onScanMetricsCreatedMatcher = + hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated"); + assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher))); + + final Matcher onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext"); + assertThat(spans, hasItem(onNextMatcher)); + spans.stream() + .filter(onNextMatcher::matches) + .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId))); + assertThat(spans, hasItem(allOf( + onNextMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + + final Matcher onCompleteMatcher = + hasName("TracedAdvancedScanResultConsumer#onComplete"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java new file mode 100644 index 000000000000..702e16a14635 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.trace.TraceUtil; + +/** + * A drop-in replacement for {@link BufferingScanResultConsumer} that adds tracing spans to its + * implementation of the {@link AdvancedScanResultConsumer} API. + */ +public class TracedAdvancedScanResultConsumer implements AdvancedScanResultConsumer { + + private final BufferingScanResultConsumer delegate = new BufferingScanResultConsumer(); + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + TraceUtil.trace( + () -> delegate.onScanMetricsCreated(scanMetrics), + "TracedAdvancedScanResultConsumer#onScanMetricsCreated"); + } + + @Override + public void onNext(Result[] results, ScanController controller) { + TraceUtil.trace( + () -> delegate.onNext(results, controller), + "TracedAdvancedScanResultConsumer#onNext"); + } + + @Override + public void onError(Throwable error) { + TraceUtil.trace( + () -> delegate.onError(error), + "TracedAdvancedScanResultConsumer#onError"); + } + + @Override + public void onComplete() { + TraceUtil.trace(delegate::onComplete, "TracedAdvancedScanResultConsumer#onComplete"); + } + + public Result take() throws IOException, InterruptedException { + return delegate.take(); + } + + public ScanMetrics getScanMetrics() { + return delegate.getScanMetrics(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java new file mode 100644 index 000000000000..0427218038d2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.List; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.trace.TraceUtil; + +/** + * A wrapper over {@link SimpleScanResultConsumer} that adds tracing of spans to its + * implementation. + */ +class TracedScanResultConsumer implements SimpleScanResultConsumer { + + private final SimpleScanResultConsumer delegate; + + public TracedScanResultConsumer(final SimpleScanResultConsumer delegate) { + this.delegate = delegate; + } + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + TraceUtil.trace( + () -> delegate.onScanMetricsCreated(scanMetrics), + "TracedScanResultConsumer#onScanMetricsCreated"); + } + + @Override + public boolean onNext(Result result) { + return TraceUtil.trace(() -> delegate.onNext(result), + "TracedScanResultConsumer#onNext"); + } + + @Override + public void onError(Throwable error) { + TraceUtil.trace(() -> delegate.onError(error), "TracedScanResultConsumer#onError"); + } + + @Override + public void onComplete() { + TraceUtil.trace(delegate::onComplete, "TracedScanResultConsumer#onComplete"); + } + + @Override + public List getAll() throws Exception { + return delegate.getAll(); + } + + @Override + public ScanMetrics getScanMetrics() { + return delegate.getScanMetrics(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java new file mode 100644 index 000000000000..3bbf2d445a81 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryClassRule.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.List; +import org.junit.rules.ExternalResource; + +/** + *

Like {@link OpenTelemetryRule}, except modeled after the junit5 implementation + * {@code OpenTelemetryExtension}. Use this class when you need to make asserts on {@link SpanData} + * created on a MiniCluster. Make sure this rule initialized before the MiniCluster so that it can + * register its instance of {@link OpenTelemetry} as the global instance before any server-side + * component can call {@link TraceUtil#getGlobalTracer()}.

+ *

For example:

+ *
{@code
+ * public class TestMyClass {
+ *   private static final OpenTelemetryClassRule otelClassRule =
+ *     OpenTelemetryClassRule.create();
+ *   private static final MiniClusterRule miniClusterRule =
+ *     MiniClusterRule.newBuilder().build();
+ *   protected static final ConnectionRule connectionRule =
+ *     new ConnectionRule(miniClusterRule::createConnection);
+ *
+ *   @ClassRule
+ *   public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
+ *     .around(miniClusterRule)
+ *     .around(connectionRule);
+ *
+ *   @Rule
+ *   public final OpenTelemetryTestRule otelTestRule =
+ *     new OpenTelemetryTestRule(otelClassRule);
+ *
+ *   @Test
+ *   public void myTest() {
+ *     // ...
+ *     // do something that makes spans
+ *     final List spans = otelClassRule.getSpans();
+ *     // make assertions on them
+ *   }
+ * }
+ * }
+ * + * @see junit5/OpenTelemetryExtension.java + */ +public final class OpenTelemetryClassRule extends ExternalResource { + + public static OpenTelemetryClassRule create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance())) + .setTracerProvider(tracerProvider) + .build(); + + return new OpenTelemetryClassRule(openTelemetry, spanExporter); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + + private OpenTelemetryClassRule( + final OpenTelemetrySdk openTelemetry, + final InMemorySpanExporter spanExporter + ) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + } + + /** Returns the {@link OpenTelemetry} created by this Rule. */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + /** Returns all the exported {@link SpanData} so far. */ + public List getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Clears the collected exported {@link SpanData}. + */ + public void clearSpans() { + spanExporter.reset(); + } + + @Override + protected void before() throws Throwable { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + protected void after() { + GlobalOpenTelemetry.resetForTest(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java new file mode 100644 index 000000000000..a6b50ffca293 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/trace/OpenTelemetryTestRule.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.trace; + +import org.junit.rules.ExternalResource; + +/** + * Used alongside {@link OpenTelemetryClassRule}. See that class's javadoc for details on when to + * use these classes instead of {@link io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule} and + * an example of how to use these classes together. + */ +public final class OpenTelemetryTestRule extends ExternalResource { + + private final OpenTelemetryClassRule classRuleSupplier; + + public OpenTelemetryTestRule(final OpenTelemetryClassRule classRule) { + this.classRuleSupplier = classRule; + } + + @Override + protected void before() throws Throwable { + classRuleSupplier.clearSpans(); + } +} diff --git a/pom.xml b/pom.xml index bbd54261b9ed..65201992fa04 100755 --- a/pom.xml +++ b/pom.xml @@ -1877,12 +1877,13 @@ -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced - -Dio.netty.eventLoopThreads=3 + -Dio.netty.eventLoopThreads=3 -Dio.opentelemetry.context.enableStrictContext=true -enableassertions -Xmx${surefire.cygwinXmx} -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true "-Djava.library.path=${hadoop.library.path};${java.library.path}" -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced + -Dio.opentelemetry.context.enableStrictContext=true ${hbase-surefire.argLine}