Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27360 The trace related assertions are flaky for async client t… #4767

Merged
merged 1 commit into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ private void scan0(Scan scan, ScanResultConsumer consumer) {
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
if (scan.isScanMetricsEnabled()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
}
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConnectionRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand Down Expand Up @@ -70,18 +72,18 @@

public abstract class AbstractTestAsyncTableScan {

protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create();
protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder()
protected static final OpenTelemetryClassRule OTEL_CLASS_RULE = OpenTelemetryClassRule.create();
protected static final MiniClusterRule MINI_CLUSTER_RULE = MiniClusterRule.newBuilder()
.setMiniClusterOption(StartTestingClusterOption.builder().numWorkers(3).build()).build();

protected static final ConnectionRule connectionRule =
ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection);
protected static final ConnectionRule CONN_RULE =
ConnectionRule.createAsyncConnectionRule(MINI_CLUSTER_RULE::createAsyncConnection);

private static final class Setup extends ExternalResource {
@Override
protected void before() throws Throwable {
final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility();
final AsyncConnection conn = connectionRule.getAsyncConnection();
final HBaseTestingUtil testingUtil = MINI_CLUSTER_RULE.getTestingUtility();
final AsyncConnection conn = CONN_RULE.getAsyncConnection();

byte[][] splitKeys = new byte[8][];
for (int i = 111; i < 999; i += 111) {
Expand All @@ -99,11 +101,11 @@ protected void before() throws Throwable {
}

@ClassRule
public static final TestRule classRule = RuleChain.outerRule(otelClassRule)
.around(miniClusterRule).around(connectionRule).around(new Setup());
public static final TestRule classRule = RuleChain.outerRule(OTEL_CLASS_RULE)
.around(MINI_CLUSTER_RULE).around(CONN_RULE).around(new Setup());

@Rule
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule);
public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(OTEL_CLASS_RULE);

@Rule
public final TestName testName = new TestName();
Expand Down Expand Up @@ -136,11 +138,11 @@ private static Scan createBatchSmallResultSizeScan() {
}

private static AsyncTable<?> getRawTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME);
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME);
}

private static AsyncTable<?> getTable() {
return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
return CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
}

private static List<Pair<String, Supplier<Scan>>> getScanCreator() {
Expand Down Expand Up @@ -198,16 +200,20 @@ protected final List<Result> convertFromBatchResult(List<Result> results) {
}

protected static void waitForSpan(final Matcher<SpanData> parentSpanMatcher) {
final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration();
final Configuration conf = MINI_CLUSTER_RULE.getTestingUtility().getConfiguration();
Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>(
"Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher)));
"Span for test failed to complete.", OTEL_CLASS_RULE::getSpans, hasItem(parentSpanMatcher)));
}

protected static Stream<SpanData> spanStream() {
return OTEL_CLASS_RULE.getSpans().stream().filter(Objects::nonNull);
}

@Test
public void testScanAll() throws Exception {
List<Result> results = doScan(createScan(), -1);
// make sure all scanners are closed at RS side
miniClusterRule.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
MINI_CLUSTER_RULE.getTestingUtility().getHBaseCluster().getRegionServerThreads().stream()
.map(JVMClusterUtil.RegionServerThread::getRegionServer).forEach(
rs -> assertEquals(
"The scanner count of " + rs.getServerName() + " is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -76,7 +74,7 @@ protected Scan createScan() {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<ScanResultConsumer> table =
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results;
if (closeAfter > 0) {
// these tests batch settings with the sample data result in each result being
Expand Down Expand Up @@ -108,38 +106,31 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);

final Matcher<SpanData> onScanMetricsCreatedMatcher =
hasName("TracedScanResultConsumer#onScanMetricsCreated");
assertThat(spans, hasItem(onScanMetricsCreatedMatcher));
spans.stream().filter(onScanMetricsCreatedMatcher::matches).forEach(span -> assertThat(span,
allOf(onScanMetricsCreatedMatcher, hasParentSpanId(scanOperationSpanId), hasEnded())));
waitForSpan(scanOperationSpanMatcher);
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> onNextMatcher = hasName("TracedScanResultConsumer#onNext");
assertThat(spans, hasItem(onNextMatcher));
spans.stream().filter(onNextMatcher::matches)
waitForSpan(onNextMatcher);
spanStream().filter(onNextMatcher::matches)
.forEach(span -> assertThat(span, allOf(onNextMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));

final Matcher<SpanData> onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete");
assertThat(spans, hasItem(onCompleteMatcher));
spans.stream().filter(onCompleteMatcher::matches)
waitForSpan(onCompleteMatcher);
spanStream().filter(onCompleteMatcher::matches)
.forEach(span -> assertThat(span, allOf(onCompleteMatcher,
hasParentSpanId(scanOperationSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
}
Expand All @@ -151,27 +142,26 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
final String scanOperationSpanId = spans.stream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
waitForSpan(scanOperationSpanMatcher);
final String scanOperationSpanId = spanStream().filter(scanOperationSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> onErrorMatcher = hasName("TracedScanResultConsumer#onError");
assertThat(spans, hasItem(onErrorMatcher));
spans.stream().filter(onErrorMatcher::matches)
waitForSpan(onErrorMatcher);
spanStream().filter(onErrorMatcher::matches)
.forEach(span -> assertThat(span, allOf(onErrorMatcher, hasParentSpanId(scanOperationSpanId),
hasStatusWithCode(StatusCode.OK), hasEnded())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -99,20 +96,19 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}

@Override
Expand All @@ -122,20 +118,19 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;

import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -83,7 +80,7 @@ protected Scan createScan() {
@Override
protected List<Result> doScan(Scan scan, int closeAfter) throws Exception {
AsyncTable<?> table =
connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
CONN_RULE.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool());
List<Result> results = new ArrayList<>();
// these tests batch settings with the sample data result in each result being
// split in two. so we must allow twice the expected results in order to reach
Expand Down Expand Up @@ -112,19 +109,17 @@ protected void assertTraceContinuity() {
allOf(hasName(parentSpanName), hasStatusWithCode(StatusCode.OK), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

assertThat(spans,
hasItem(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded())));
waitForSpan(allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.OK), hasEnded()));
}

@Override
Expand All @@ -134,20 +129,19 @@ protected void assertTraceContinuity() {
final Matcher<SpanData> parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded());
waitForSpan(parentSpanMatcher);

final List<SpanData> spans =
otelClassRule.getSpans().stream().filter(Objects::nonNull).collect(Collectors.toList());
if (logger.isDebugEnabled()) {
StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans);
StringTraceRenderer stringTraceRenderer =
new StringTraceRenderer(spanStream().collect(Collectors.toList()));
stringTraceRenderer.render(logger::debug);
}

final String parentSpanId = spans.stream().filter(parentSpanMatcher::matches)
.map(SpanData::getSpanId).findAny().orElseThrow(AssertionError::new);
final String parentSpanId =
spanStream().filter(parentSpanMatcher::matches).map(SpanData::getSpanId).findAny().get();

final Matcher<SpanData> scanOperationSpanMatcher =
allOf(hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())),
hasParentSpanId(parentSpanId), hasStatusWithCode(StatusCode.ERROR),
hasException(exceptionMatcher), hasEnded());
assertThat(spans, hasItem(scanOperationSpanMatcher));
waitForSpan(scanOperationSpanMatcher);
}
}
Loading