Skip to content

Commit

Permalink
HBASE-26545 Implement tracing of scan
Browse files Browse the repository at this point in the history
* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations.
* the span of the `SCAN` table operation should have children representing all the RPC calls
  involved in servicing the scan.
* when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted
  from the callback methods should also be tied to the span that represents the `SCAN` table
  operation. This is easily done because these callbacks are executed on the RPC thread.
* when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the
  callback methods are tied to the context of user application code. We could `link` them to the
  `SCAN` table operation, except that we have no means of passing that span along.
  • Loading branch information
ndimiduk committed Feb 10, 2022
1 parent 85fadfd commit ad27bcb
Show file tree
Hide file tree
Showing 17 changed files with 839 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,19 +27,22 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
Expand Down Expand Up @@ -163,6 +166,7 @@ private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcControlle
}

private void startScan(OpenScannerResponse resp) {
final Context context = Context.current();
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
Expand All @@ -173,14 +177,16 @@ private void startScan(OpenScannerResponse resp) {
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
}
}
});
}
Expand All @@ -202,18 +208,24 @@ private long getPrimaryTimeoutNs() {
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
final Context context = Context.current();
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
try (Scope ignored = context.makeCurrent()) {
if (error != null) {
consumer.onError(error);
return;
}
startScan(resp);
}
startScan(resp);
});
}

public void start() {
openScanner();
final Supplier<Span> spanSupplier = new TableOperationSpanBuilder(conn)
.setTableName(tableName)
.setOperation(scan);
TraceUtil.trace(() -> openScanner(), spanSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;

import io.opentelemetry.context.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -572,7 +573,8 @@ private void call() {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
Context context = Context.current();
stub.scan(controller, req, resp -> context.wrap(() -> onComplete(controller, resp)).run());
}

private void next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;

import io.opentelemetry.context.Context;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,7 +32,6 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

/**
Expand Down Expand Up @@ -248,7 +247,8 @@ private void scan0(Scan scan, ScanResultConsumer consumer) {

@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,30 +636,26 @@ public ResultScanner getScanner(Scan scan) {

@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {

@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}

@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}

@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}, supplier);
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail;
Expand All @@ -44,9 +46,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
Expand Down Expand Up @@ -76,6 +80,8 @@
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
Expand All @@ -95,6 +101,7 @@

@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncTableTracing {
private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableTracing.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
Expand All @@ -106,7 +113,7 @@ public class TestAsyncTableTracing {

private AsyncConnectionImpl conn;

private AsyncTable<?> table;
private AsyncTable<ScanResultConsumer> table;

@Rule
public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
Expand Down Expand Up @@ -452,6 +459,53 @@ public void testScanAll() {
assertTrace("SCAN");
}

@Test
public void testScan() throws Throwable {
final CountDownLatch doneSignal = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Throwable> throwable = new AtomicReference<>();
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
table.scan(scan, new ScanResultConsumer() {
@Override public boolean onNext(Result result) {
if (result.getRow() != null) {
count.incrementAndGet();
}
return true;
}

@Override public void onError(Throwable error) {
throwable.set(error);
doneSignal.countDown();
}

@Override public void onComplete() {
doneSignal.countDown();
}
});
doneSignal.await();
if (throwable.get() != null) {
throw throwable.get();
}
assertThat("user code did not run. check test setup.", count.get(), greaterThan(0));
assertTrace("SCAN");
}

@Test
public void testGetScanner() {
final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1);
try (ResultScanner scanner = table.getScanner(scan)) {
int count = 0;
for (Result result : scanner) {
if (result.getRow() != null) {
count++;
}
}
// do something with it.
assertThat(count, greaterThanOrEqualTo(0));
}
assertTrace("SCAN");
}

@Test
public void testExistsList() {
CompletableFuture
Expand Down
Loading

0 comments on commit ad27bcb

Please sign in to comment.