Skip to content

Commit

Permalink
HBASE-26531 Trace coprocessor exec endpoints
Browse files Browse the repository at this point in the history
Trace table ExecService invocations as table operations. Ensure span relationships for both table
and master invocations.

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
ndimiduk committed Mar 21, 2022
1 parent 321c35a commit 0d968af
Show file tree
Hide file tree
Showing 11 changed files with 1,103 additions and 167 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,8 +18,8 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;

import com.google.protobuf.RpcChannel;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -279,26 +279,27 @@ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> st
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
final Context context = Context.current();
CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {

@Override
public void onRegionComplete(RegionInfo region, R resp) {
pool.execute(() -> callback.onRegionComplete(region, resp));
pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
}

@Override
public void onRegionError(RegionInfo region, Throwable error) {
pool.execute(() -> callback.onRegionError(region, error));
pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
}

@Override
public void onComplete() {
pool.execute(() -> callback.onComplete());
pool.execute(context.wrap(callback::onComplete));
}

@Override
public void onError(Throwable error) {
pool.execute(() -> callback.onError(error));
pool.execute(context.wrap(() -> callback.onError(error)));
}
};
CoprocessorServiceBuilder<S, R> builder =
Expand Down
230 changes: 116 additions & 114 deletions hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**
*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -31,15 +30,16 @@
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -1168,13 +1168,10 @@ public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> se
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
throws ServiceException, Throwable {
final Map<byte[],R> results = Collections.synchronizedMap(
new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R value) {
if (region != null) {
results.put(region, value);
}
new TreeMap<>(Bytes.BYTES_COMPARATOR));
coprocessorService(service, startKey, endKey, callable, (region, row, value) -> {
if (region != null) {
results.put(region, value);
}
});
return results;
Expand All @@ -1184,39 +1181,43 @@ public void update(byte[] region, byte[] row, R value) {
public <T extends Service, R> void coprocessorService(final Class<T> service,
byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
final Batch.Callback<R> callback) throws ServiceException, Throwable {
// get regions covered by the row range
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
final RegionCoprocessorRpcChannel channel =
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
TraceUtil.trace(() -> {
final Context context = Context.current();
final ExecutorService wrappedPool = context.wrap(pool);
// get regions covered by the row range
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[],Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
final RegionCoprocessorRpcChannel channel =
new RegionCoprocessorRpcChannel(connection, tableName, r);
Future<R> future = pool.submit(new Callable<R>() {
@Override
public R call() throws Exception {
Future<R> future = wrappedPool.submit(() -> {
T instance =
org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
org.apache.hadoop.hbase.protobuf.ProtobufUtil.newServiceStub(service, channel);
R result = callable.call(instance);
byte[] region = channel.getLastRegion();
if (callback != null) {
callback.update(region, r, result);
}
return result;
}
});
futures.put(r, future);
}
for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
try {
e.getValue().get();
} catch (ExecutionException ee) {
LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
+ Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause();
} catch (InterruptedException ie) {
throw new InterruptedIOException("Interrupted calling coprocessor service "
});
futures.put(r, future);
}
for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
try {
e.getValue().get();
} catch (ExecutionException ee) {
LOG.warn("Error calling coprocessor service {} for row {}", service.getName(),
Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause();
} catch (InterruptedException ie) {
throw new InterruptedIOException("Interrupted calling coprocessor service "
+ service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
}
}
}
}, supplier);
}

private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
Expand Down Expand Up @@ -1308,17 +1309,14 @@ public String toString() {
public <R extends Message> Map<byte[], R> batchCoprocessorService(
Descriptors.MethodDescriptor methodDescriptor, Message request,
byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<byte[], R>(
Bytes.BYTES_COMPARATOR));
final Map<byte[], R> results = Collections.synchronizedMap(new TreeMap<>(
Bytes.BYTES_COMPARATOR));
batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
new Callback<R>() {
@Override
public void update(byte[] region, byte[] row, R result) {
(region, row, result) -> {
if (region != null) {
results.put(region, result);
}
}
});
});
return results;
}

Expand All @@ -1327,88 +1325,92 @@ public <R extends Message> void batchCoprocessorService(
final Descriptors.MethodDescriptor methodDescriptor, final Message request,
byte[] startKey, byte[] endKey, final R responsePrototype, final Callback<R> callback)
throws ServiceException, Throwable {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC);
TraceUtil.trace(() -> {
final Context context = Context.current();
final byte[] sanitizedStartKey = Optional.ofNullable(startKey)
.orElse(HConstants.EMPTY_START_ROW);
final byte[] sanitizedEndKey = Optional.ofNullable(endKey)
.orElse(HConstants.EMPTY_END_ROW);

// get regions covered by the row range
Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
getKeysAndRegionsInRange(sanitizedStartKey, sanitizedEndKey, true);
List<byte[]> keys = keysAndRegions.getFirst();
List<HRegionLocation> regions = keysAndRegions.getSecond();

// check if we have any calls to make
if (keys.isEmpty()) {
LOG.info("No regions were selected by key range start={}, end={}",
Bytes.toStringBinary(sanitizedStartKey), Bytes.toStringBinary(sanitizedEndKey));
return;
}

if (startKey == null) {
startKey = HConstants.EMPTY_START_ROW;
}
if (endKey == null) {
endKey = HConstants.EMPTY_END_ROW;
}
// get regions covered by the row range
Pair<List<byte[]>, List<HRegionLocation>> keysAndRegions =
getKeysAndRegionsInRange(startKey, endKey, true);
List<byte[]> keys = keysAndRegions.getFirst();
List<HRegionLocation> regions = keysAndRegions.getSecond();

// check if we have any calls to make
if (keys.isEmpty()) {
LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
", end=" + Bytes.toStringBinary(endKey));
return;
}

List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
final Map<byte[], RegionCoprocessorServiceExec> execsByRow = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < keys.size(); i++) {
final byte[] rowKey = keys.get(i);
final byte[] region = regions.get(i).getRegionInfo().getRegionName();
RegionCoprocessorServiceExec exec =
List<RegionCoprocessorServiceExec> execs = new ArrayList<>(keys.size());
final Map<byte[], RegionCoprocessorServiceExec> execsByRow =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < keys.size(); i++) {
final byte[] rowKey = keys.get(i);
final byte[] region = regions.get(i).getRegionInfo().getRegionName();
RegionCoprocessorServiceExec exec =
new RegionCoprocessorServiceExec(region, rowKey, methodDescriptor, request);
execs.add(exec);
execsByRow.put(rowKey, exec);
}
execs.add(exec);
execsByRow.put(rowKey, exec);
}

// tracking for any possible deserialization errors on success callback
// TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
final List<Throwable> callbackErrorExceptions = new ArrayList<>();
final List<Row> callbackErrorActions = new ArrayList<>();
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];
// tracking for any possible deserialization errors on success callback
// TODO: it would be better to be able to reuse AsyncProcess.BatchErrors here
final List<Throwable> callbackErrorExceptions = new ArrayList<>();
final List<Row> callbackErrorActions = new ArrayList<>();
final List<String> callbackErrorServers = new ArrayList<>();
Object[] results = new Object[execs.size()];

AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
AsyncProcess asyncProcess = new AsyncProcess(connection, configuration,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
RpcControllerFactory.instantiate(configuration));

Callback<ClientProtos.CoprocessorServiceResult> resultsCallback
= (byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
": region=" + Bytes.toStringBinary(region) +
", row=" + Bytes.toStringBinary(row) +
", value=" + serviceResult.getValue().getValue());
}
try {
Message.Builder builder = responsePrototype.newBuilderForType();
org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
serviceResult.getValue().getValue().toByteArray());
callback.update(region, row, (R) builder.build());
} catch (IOException e) {
LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
e);
callbackErrorExceptions.add(e);
callbackErrorActions.add(execsByRow.get(row));
callbackErrorServers.add("null");
}
};
AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
Callback<ClientProtos.CoprocessorServiceResult> resultsCallback =
(byte[] region, byte[] row, ClientProtos.CoprocessorServiceResult serviceResult) -> {
if (LOG.isTraceEnabled()) {
LOG.trace("Received result for endpoint {}: region={}, row={}, value={}",
methodDescriptor.getFullName(), Bytes.toStringBinary(region),
Bytes.toStringBinary(row), serviceResult.getValue().getValue());
}
try {
Message.Builder builder = responsePrototype.newBuilderForType();
org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builder,
serviceResult.getValue().getValue().toByteArray());
callback.update(region, row, (R) builder.build());
} catch (IOException e) {
LOG.error("Unexpected response type from endpoint {}", methodDescriptor.getFullName(),
e);
callbackErrorExceptions.add(e);
callbackErrorActions.add(execsByRow.get(row));
callbackErrorServers.add("null");
}
};
AsyncProcessTask<ClientProtos.CoprocessorServiceResult> task =
AsyncProcessTask.newBuilder(resultsCallback)
.setPool(pool)
.setTableName(tableName)
.setRowAccess(execs)
.setResults(results)
.setRpcTimeout(readRpcTimeoutMs)
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
AsyncRequestFuture future = asyncProcess.submit(task);
future.waitUntilDone();

if (future.hasError()) {
throw future.getErrors();
} else if (!callbackErrorExceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(callbackErrorExceptions, callbackErrorActions,
callbackErrorServers);
}
.setPool(context.wrap(pool))
.setTableName(tableName)
.setRowAccess(execs)
.setResults(results)
.setRpcTimeout(readRpcTimeoutMs)
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
AsyncRequestFuture future = asyncProcess.submit(task);
future.waitUntilDone();

if (future.hasError()) {
throw future.getErrors();
} else if (!callbackErrorExceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(
callbackErrorExceptions, callbackErrorActions, callbackErrorServers);
}
}, supplier);
}

@Override
Expand Down
Loading

0 comments on commit 0d968af

Please sign in to comment.