Skip to content

Commit

Permalink
HBASE-26545 Implement tracing of scan
Browse files Browse the repository at this point in the history
* on `AsyncTable`, both `scan` and `scanAll` methods should result in `SCAN` table operations.
* the span of the `SCAN` table operation should have children representing all the RPC calls
  involved in servicing the scan.
* when a user provides custom implementation of `AdvancedScanResultConsumer`, any spans emitted
  from the callback methods should also be tied to the span that represents the `SCAN` table
  operation. This is easily done because these callbacks are executed on the RPC thread.
* when a user provides a custom implementation of `ScanResultConsumer`, any spans emitted from the
  callback methods should be also be tied to the span that represents the `SCAN` table
  operation. This accomplished by carefully passing the span instance around after it is created.
  • Loading branch information
ndimiduk committed Mar 14, 2022
1 parent a49d147 commit 893f640
Show file tree
Hide file tree
Showing 23 changed files with 1,359 additions and 233 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -27,19 +27,21 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
Expand Down Expand Up @@ -85,6 +87,17 @@ class AsyncClientScanner {

private final ScanResultCache resultCache;

/*
* The `span` instance is accessed concurrently by several threads, but we use only basic
* synchronization. The class claims to be `@ThreadSafe` so we rely on the implementation to
* correctly handle concurrent use. The instance itself is initialized in the `start` method,
* so we cannot make it `final`. Because the instance is created before any consuming runnable
* is submitted to a worker pool, it should be enough to mark it as `volatile`. To avoid over-
* paying the price of the memory barrier, where a consumer makes multiple uses of the `span`
* instance, we use a local final non-volatile reference.
*/
private volatile Span span = null;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
Expand Down Expand Up @@ -140,26 +153,37 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In

private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
return;
}
future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp));
});
} catch (IOException e) {
future.completeExceptionally(e);
final Span localSpan = span;
try (Scope ignored = localSpan.makeCurrent()) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
ScanRequest request = RequestConverter.buildScanRequest(
loc.getRegion().getRegionName(), scan, scan.getCaching(), false);
stub.scan(controller, request, resp -> {
try (Scope ignored1 = localSpan.makeCurrent()) {
if (controller.failed()) {
final IOException e = controller.getFailed();
future.completeExceptionally(e);
TraceUtil.setError(localSpan, e);
localSpan.end();
return;
}
future.complete(new OpenScannerResponse(
loc, isRegionServerRemote, stub, controller, resp));
}
});
} catch (IOException e) {
TraceUtil.setError(localSpan, e);
localSpan.end();
future.completeExceptionally(e);
}
return future;
}
return future;
}

private void startScan(OpenScannerResponse resp) {
Expand All @@ -173,26 +197,41 @@ private void startScan(OpenScannerResponse resp) {
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
}
if (hasMore) {
openScanner();
} else {
consumer.onComplete();
final Span localSpan = span;
try (Scope ignored = localSpan.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(localSpan, error);
localSpan.end();
}
}
if (hasMore) {
openScanner();
} else {
try {
consumer.onComplete();
} finally {
localSpan.setStatus(StatusCode.OK);
localSpan.end();
}
}
}
});
}

private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
try (Scope ignored = span.makeCurrent()) {
return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
}

private long getPrimaryTimeoutNs() {
Expand All @@ -206,15 +245,34 @@ private void openScanner() {
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer,
conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
final Span localSpan = span;
try (Scope ignored = localSpan.makeCurrent()) {
if (error != null) {
try {
consumer.onError(error);
return;
} finally {
TraceUtil.setError(localSpan, error);
localSpan.end();
}
}
startScan(resp);
}
startScan(resp);
});
}

public void start() {
openScanner();
final Span localSpan = new TableOperationSpanBuilder(conn)
.setTableName(tableName)
.setOperation(scan)
.build();
if (consumer instanceof AsyncTableResultScanner) {
AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer;
scanner.setSpan(localSpan);
}
span = localSpan;
try (Scope ignored = localSpan.makeCurrent()) {
openScanner();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics;
import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics;

import io.opentelemetry.context.Context;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -573,7 +574,8 @@ private void call() {
resetController(controller, callTimeoutNs, priority);
ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false,
nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit());
stub.scan(controller, req, resp -> onComplete(controller, resp));
Context context = Context.current();
stub.scan(controller, req, resp -> context.wrap(() -> onComplete(controller, resp)).run());
}

private void next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,7 +34,6 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

/**
Expand Down Expand Up @@ -232,22 +233,29 @@ public ResultScanner getScanner(Scan scan) {
}

private void scan0(Scan scan, ScanResultConsumer consumer) {
try (ResultScanner scanner = getScanner(scan)) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null;) {
if (!consumer.onNext(result)) {
break;
Span span = null;
try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) {
span = scanner.getSpan();
try (Scope ignored = span.makeCurrent()) {
consumer.onScanMetricsCreated(scanner.getScanMetrics());
for (Result result; (result = scanner.next()) != null; ) {
if (!consumer.onNext(result)) {
break;
}
}
consumer.onComplete();
}
consumer.onComplete();
} catch (IOException e) {
consumer.onError(e);
try (Scope ignored = span.makeCurrent()) {
consumer.onError(e);
}
}
}

@Override
public void scan(Scan scan, ScanResultConsumer consumer) {
pool.execute(() -> scan0(scan, consumer));
final Context context = Context.current();
pool.execute(context.wrap(() -> scan0(scan, consumer)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;

import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum

private ScanResumer resumer;

// Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`.
private Span span = null;

public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) {
this.tableName = tableName;
this.maxCacheSize = maxCacheSize;
Expand All @@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) {
resumer = controller.suspend();
}

Span getSpan() {
return span;
}

void setSpan(final Span span) {
this.span = span;
}

@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) {

@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(scan);
return tracedFuture(() -> {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {
CompletableFuture<List<Result>> future = new CompletableFuture<>();
List<Result> scanResults = new ArrayList<>();
scan(scan, new AdvancedScanResultConsumer() {

@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}
@Override
public void onNext(Result[] results, ScanController controller) {
scanResults.addAll(Arrays.asList(results));
}

@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}
@Override
public void onError(Throwable error) {
future.completeExceptionally(error);
}

@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}, supplier);
@Override
public void onComplete() {
future.complete(scanResults);
}
});
return future;
}

@Override
Expand Down
Loading

0 comments on commit 893f640

Please sign in to comment.