Skip to content

Commit

Permalink
HBASE-26545 Implement tracing of scan
Browse files Browse the repository at this point in the history
* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations.
* the span of the `SCAN` table operation should have children representing all the RPC calls
  involved in servicing the scan.
* when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted
  from the callback methods should also be tied to the span that represents the `SCAN` table
  operation. This is easily done because these callbacks are executed on the RPC thread.
* when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the
  callback methods should be also be tied to the span that represents the `SCAN` table
  operation. This accomplished by carefully passing the span instance around after it is created.

Signed-off-by: Andrew Purtell <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
  • Loading branch information
ndimiduk authored Mar 31, 2022
1 parent 98db15a commit 489af42
Show file tree
Hide file tree
Showing 26 changed files with 2,300 additions and 1,435 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -28,14 +28,19 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;
Expand Down Expand Up @@ -85,6 +90,8 @@ class AsyncClientScanner {

private final ScanResultCache resultCache;

private final Span span;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
Expand Down Expand Up @@ -112,6 +119,18 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
} else {
this.scanMetrics = null;
}

/*
* Assumes that the `start()` method is called immediately after construction. If this is no
* longer the case, for tracing correctness, we should move the start of the span into the
* `start()` method. The cost of doing so would be making access to the `span` safe for
* concurrent threads.
*/
span = new TableOperationSpanBuilder(conn).setTableName(tableName).setOperation(scan).build();
if (consumer instanceof AsyncTableResultScanner) {
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
scanner.setSpan(span);
}
}

private static final class OpenScannerResponse {
Expand Down Expand Up @@ -140,64 +159,87 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In

private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
try (Scope ignored = span.makeCurrent()) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(),
scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
try (Scope ignored1 = span.makeCurrent()) {
if (controller.failed()) {
final IOException e = controller.getFailed();
future.completeExceptionally(e);
TraceUtil.setError(span, e);
span.end();
return;
}
future.complete(
new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
}
});
} catch (IOException e) {
// span is closed by listener attached to the Future in `openScanner()`
future.completeExceptionally(e);
}
return future;
}
return future;
}

private void startScan(OpenScannerResponse resp) {
addListener(
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
.remote(resp.isRegionServerRemote)
.scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
span.setStatus(StatusCode.OK);
span.end();
}
}
}
});
}

private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
try (Scope ignored = span.makeCurrent()) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority()).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.action(this::callOpenScanner).call();
}
}

private long getPrimaryTimeoutNs() {
return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
: conn.connConf.getPrimaryScanTimeoutNs();
: conn.connConf.getPrimaryScanTimeoutNs();
}

private void openScanner() {
Expand All @@ -206,15 +248,24 @@ private void openScanner() {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
try (Scope ignored = span.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(span, error);
span.end();
}
}
startScan(resp);
}
startScan(resp);
});
}

public void start() {
openScanner();
try (Scope ignored = span.makeCurrent()) {
openScanner();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -28,6 +28,8 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -170,8 +172,8 @@ public ScanControllerImpl(Optional<Cursor> cursor) {

private void preCheck() {
Preconditions.checkState(Thread.currentThread() == callerThread,
"The current thread is %s, expected thread is %s, " +
"you should not call this method outside onNext or onHeartbeat",
"The current thread is %s, expected thread is %s, "
+ "you should not call this method outside onNext or onHeartbeat",
Thread.currentThread(), callerThread);
Preconditions.checkState(state.equals(ScanControllerState.INITIALIZED),
"Invalid Stopper state %s", state);
Expand Down Expand Up @@ -201,7 +203,7 @@ ScanControllerState destroy() {

@Override
public Optional<Cursor> cursor() {
return cursor;
return cursor;
}
}

Expand Down Expand Up @@ -352,9 +354,9 @@ private void closeScanner() {
ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false);
stub.scan(controller, req, resp -> {
if (controller.failed()) {
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId +
" for " + loc.getRegion().getEncodedName() + " of " +
loc.getRegion().getTable() + " failed, ignore, probably already closed",
LOG.warn("Call to " + loc.getServerName() + " for closing scanner id = " + scannerId
+ " for " + loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
+ " failed, ignore, probably already closed",
controller.getFailed());
}
});
Expand Down Expand Up @@ -392,19 +394,19 @@ private void completeWhenError(boolean closeScanner) {
private void onError(Throwable error) {
error = translateException(error);
if (tries > startLogErrorsCnt) {
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " +
loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable() +
" failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " +
TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() +
" ms",
LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for "
+ loc.getRegion().getEncodedName() + " of " + loc.getRegion().getTable()
+ " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = "
+ TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs()
+ " ms",
error);
}
boolean scannerClosed =
error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
boolean scannerClosed = error instanceof UnknownScannerException
|| error instanceof NotServingRegionException
|| error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
RetriesExhaustedException.ThrowableWithExtraContext qt =
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
new RetriesExhaustedException.ThrowableWithExtraContext(error,
EnvironmentEdgeManager.currentTime(), "");
exceptions.add(qt);
if (tries >= maxAttempts) {
completeExceptionally(!scannerClosed);
Expand Down Expand Up @@ -573,7 +575,12 @@ private void call() {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
final Context context = Context.current();
stub.scan(controller, req, resp -> {
try (Scope ignored = context.makeCurrent()) {
onComplete(controller, resp);
}
});
}

private void next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;

import com.google.protobuf.RpcChannel;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -177,8 +180,7 @@ public CheckAndMutateBuilder ifMatches(CompareOperator compareOp, byte[] value)
public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter) {
return new CheckAndMutateWithFilterBuilder() {

private final CheckAndMutateWithFilterBuilder builder =
rawTable.checkAndMutate(row, filter);
private final CheckAndMutateWithFilterBuilder builder = rawTable.checkAndMutate(row, filter);

@Override
public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
Expand Down Expand Up @@ -209,10 +211,9 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
}

@Override
public List<CompletableFuture<CheckAndMutateResult>> checkAndMutate(
List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream()
.map(this::wrap).collect(toList());
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
return rawTable.checkAndMutate(checkAndMutates).stream().map(this::wrap).collect(toList());
}

@Override
Expand All @@ -231,22 +232,29 @@ public ResultScanner getScanner(Scan scan) {
}

private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Span span = null;
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
try (Scope ignored = span.makeCurrent()) {
consumer.onError(e);
}
}
}

@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}

@Override
Expand Down Expand Up @@ -303,7 +311,7 @@ public void onError(Throwable error) {
}
};
CoprocessorServiceBuilder<S, R> builder =
rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
return new CoprocessorServiceBuilder<S, R>() {

@Override
Expand Down
Loading

0 comments on commit 489af42

Please sign in to comment.