From c1cad7f653e7f691b78fa605a776b18808c67101 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Fri, 9 Sep 2022 22:41:55 +0800 Subject: [PATCH] HBASE-27360 The trace related assertions are flaky for async client tests (#4767) Signed-off-by: GeorryHuang (cherry picked from commit f3f88ff6c17633d98facf9dccb57e63b87200e4a) --- .../hadoop/hbase/client/AsyncTableImpl.java | 4 +- .../client/AbstractTestAsyncTableScan.java | 34 ++++++----- .../hbase/client/TestAsyncTableScan.java | 52 +++++++---------- .../hbase/client/TestAsyncTableScanAll.java | 25 ++++----- .../hbase/client/TestAsyncTableScanner.java | 30 ++++------ .../hbase/client/TestRawAsyncTableScan.java | 56 ++++++++----------- 6 files changed, 89 insertions(+), 112 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 7f5e6562c021..40dffcd62adf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -236,7 +236,9 @@ private void scan0(Scan scan, ScanResultConsumer consumer) { try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { span = scanner.getSpan(); try (Scope ignored = span.makeCurrent()) { - consumer.onScanMetricsCreated(scanner.getScanMetrics()); + if (scan.isScanMetricsEnabled()) { + consumer.onScanMetricsCreated(scanner.getScanMetrics()); + } for (Result result; (result = scanner.next()) != null;) { if (!consumer.onNext(result)) { break; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index be258a77b85c..19b15817a178 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -37,12 +37,14 @@ import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ConnectionRule; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -70,18 +72,18 @@ public abstract class AbstractTestAsyncTableScan { - protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); - protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder() + protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create(); + protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder() .setMiniClusterOption(StartMiniClusterOption.builder().numWorkers(3).build()).build(); - protected static final ConnectionRule connectionRule = - ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); + protected static final ConnectionRule CONN_RULE = + ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection); private static final class Setup extends ExternalResource { @Override protected void before() throws Throwable { - final HBaseTestingUtility testingUtil = miniClusterRule.getTestingUtility(); - final AsyncConnection conn = connectionRule.getAsyncConnection(); + final HBaseTestingUtility testingUtil = MINI_CLUSTER_RULE.getTestingUtility(); + final AsyncConnection conn = CONN_RULE.getAsyncConnection(); byte[][] splitKeys = new byte[8][]; for (int i = 111; i < 999; i += 111) { @@ -99,11 +101,11 @@ protected void before() throws Throwable { } @ClassRule - public static final TestRule classRule = RuleChain.outerRule(otelClassRule) - .around(miniClusterRule).around(connectionRule).around(new Setup()); + public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE) + .around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup()); @Rule - public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); + public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE); @Rule public final TestName testName = new TestName(); @@ -136,11 +138,11 @@ private static Scan createBatchSmallResultSizeScan() { } private static AsyncTable getRawTable() { - return connectionRule.getAsyncConnection().getTable(TABLE_NAME); + return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME); } private static AsyncTable getTable() { - return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); + return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); } private static List>> getScanCreator() { @@ -198,16 +200,20 @@ protected final List convertFromBatchResult(List results) { } protected static void waitForSpan(final Matcher parentSpanMatcher) { - final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration(); + final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration(); Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( - "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher))); + "Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher))); + } + + protected static Stream spanStream() { + return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull); } @Test public void testScanAll() throws Exception { List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side - miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() + MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream() .map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach( rs -> assertEquals( "The scanner count of " + rs.getServerName() + " is " diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index a0f38c158033..efd76677373d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -24,13 +24,11 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; -import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -76,7 +74,7 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = - connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); + CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results; if (closeAfter > 0) { // these tests batch settings with the sample data result in each result being @@ -108,38 +106,31 @@ protected void assertTraceContinuity() { allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); - final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); - - final Matcher onScanMetricsCreatedMatcher = - hasName("TracedScanResultConsumer#onScanMetricsCreated"); - assertThat(spans, hasItem(onScanMetricsCreatedMatcher)); - spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span, - allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded()))); + waitForSpan(scanOperationSpanMatcher); + final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId).findAny().get(); final Matcher onNextMatcher = hasName("TracedScanResultConsumer#onNext"); - assertThat(spans, hasItem(onNextMatcher)); - spans.stream().filter(onNextMatcher::matches) + waitForSpan(onNextMatcher); + spanStream().filter(onNextMatcher::matches) .forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); final Matcher onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete"); - assertThat(spans, hasItem(onCompleteMatcher)); - spans.stream().filter(onCompleteMatcher::matches) + waitForSpan(onCompleteMatcher); + spanStream().filter(onCompleteMatcher::matches) .forEach(span -> assertThat(span, allOf(onCompleteMatcher, hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); } @@ -151,27 +142,26 @@ protected void assertTraceContinuity() { final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), hasException(exceptionMatcher), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); - final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + waitForSpan(scanOperationSpanMatcher); + final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId).findAny().get(); final Matcher onErrorMatcher = hasName("TracedScanResultConsumer#onError"); - assertThat(spans, hasItem(onErrorMatcher)); - spans.stream().filter(onErrorMatcher::matches) + waitForSpan(onErrorMatcher); + spanStream().filter(onErrorMatcher::matches) .forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index cbb2541a1099..a59f1fa355be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -22,15 +22,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; -import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -99,20 +96,19 @@ protected void assertTraceContinuity() { allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); + waitForSpan(scanOperationSpanMatcher); } @Override @@ -122,20 +118,19 @@ protected void assertTraceContinuity() { final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), hasException(exceptionMatcher), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); + waitForSpan(scanOperationSpanMatcher); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index 10932a7da772..0da3e0ded58a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -22,16 +22,13 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.startsWith; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -83,7 +80,7 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { AsyncTable table = - connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); + CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -112,19 +109,17 @@ protected void assertTraceContinuity() { allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); - assertThat(spans, - hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), - hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); + waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())); } @Override @@ -134,20 +129,19 @@ protected void assertTraceContinuity() { final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), hasException(exceptionMatcher), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); + waitForSpan(scanOperationSpanMatcher); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 1aba069ac495..b7a8fc9a0178 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -24,15 +24,12 @@ import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; -import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -77,7 +74,7 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer(); - connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); + CONN_RULE.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -104,39 +101,33 @@ protected void assertTraceContinuity() { allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); - final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); - - // RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug? - final Matcher onScanMetricsCreatedMatcher = - hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated"); - assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher))); + waitForSpan(scanOperationSpanMatcher); + final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId).findAny().get(); final Matcher onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext"); - assertThat(spans, hasItem(onNextMatcher)); - spans.stream().filter(onNextMatcher::matches) + waitForSpan(onNextMatcher); + spanStream().filter(onNextMatcher::matches) .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId))); - assertThat(spans, hasItem(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId), - hasStatusWithCode(StatusCode.OK), hasEnded()))); + waitForSpan(allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), hasEnded())); final Matcher onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onComplete"); - assertThat(spans, hasItem(onCompleteMatcher)); - spans.stream().filter(onCompleteMatcher::matches) + waitForSpan(onCompleteMatcher); + spanStream().filter(onCompleteMatcher::matches) .forEach(span -> assertThat(span, allOf(onCompleteMatcher, hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); } @@ -148,27 +139,26 @@ protected void assertTraceContinuity() { final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); waitForSpan(parentSpanMatcher); - final List spans = - otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList()); if (logger.isDebugEnabled()) { - StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + StringTraceRenderer stringTraceRenderer = + new StringTraceRenderer(spanStream().collect(Collectors.toList())); stringTraceRenderer.render(logger::debug); } - final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + final String parentSpanId = + spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get(); final Matcher scanOperationSpanMatcher = allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR), hasException(exceptionMatcher), hasEnded()); - assertThat(spans, hasItem(scanOperationSpanMatcher)); - final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches) - .map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new); + waitForSpan(scanOperationSpanMatcher); + final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId).findAny().get(); final Matcher onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError"); - assertThat(spans, hasItem(onCompleteMatcher)); - spans.stream().filter(onCompleteMatcher::matches) + waitForSpan(onCompleteMatcher); + spanStream().filter(onCompleteMatcher::matches) .forEach(span -> assertThat(span, allOf(onCompleteMatcher, hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()))); }