Skip to content

Commit

Permalink
HBASE-27360 The trace related assertions are flaky for async client t…
Browse files Browse the repository at this point in the history
…ests (#4767)

Signed-off-by: GeorryHuang <[email protected]>
  • Loading branch information
Apache9 authored Sep 9, 2022
1 parent 793f020 commit f3f88ff
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ private void scan0(Scan scan, ScanResultConsumer consumer) {
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
if (scan.isScanMetricsEnabled()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
}
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand Down Expand Up @@ -70,18 +72,18 @@

public abstract class AbstractTestAsyncTableScan {

protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
.setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();

protected static final ConnectionRule connectionRule =
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
protected static final ConnectionRule CONN_RULE =
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);

private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
final AsyncConnection conn = CONN_RULE.getAsyncConnection();

byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
Expand All @@ -99,11 +101,11 @@ protected void before() throws Throwable {
}

@ClassRule
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
.around(miniClusterRule).around(connectionRule).around(new Setup());
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());

@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);

@Rule
public final TestName testName = new TestName();
Expand Down Expand Up @@ -136,11 +138,11 @@ private static Scan createBatchSmallResultSizeScan() {
}

private static AsyncTable<?> getRawTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
}

private static AsyncTable<?> getTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
}

private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
Expand Down Expand Up @@ -198,16 +200,20 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {
}

protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
}

protected static Stream<SpanData> spanStream() {
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
}

@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -76,7 +74,7 @@ protected Scan createScan() {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
Expand Down Expand Up @@ -108,38 +106,31 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);

final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
waitForSpan(scanOperationSpanMatcher);
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream().filter(onNextMatcher::matches)
waitForSpan(onNextMatcher);
spanStream().filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));

final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream().filter(onCompleteMatcher::matches)
waitForSpan(onCompleteMatcher);
spanStream().filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
Expand All @@ -151,27 +142,26 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
waitForSpan(scanOperationSpanMatcher);
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
assertThat(spans, hasItem(onErrorMatcher));
spans.stream().filter(onErrorMatcher::matches)
waitForSpan(onErrorMatcher);
spanStream().filter(onErrorMatcher::matches)
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -99,20 +96,19 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}

@Override
Expand All @@ -122,20 +118,19 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,7 +80,7 @@ protected Scan createScan() {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table =
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
Expand Down Expand Up @@ -112,19 +109,17 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

assertThat(spans,
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
}

@Override
Expand All @@ -134,20 +129,19 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}
}
Loading

0 comments on commit f3f88ff

Please sign in to comment.