Skip to content

Commit

Permalink
HBASE-26531 Trace coprocessor exec endpoints
Browse files Browse the repository at this point in the history
Trace table ExecService invocations as table operations. Ensure span relationships for both table
and master invocations.

Signed-off-by: Andrew Purtell <[email protected]>
  • Loading branch information
ndimiduk committed Mar 14, 2022
1 parent a49d147 commit 36a5f86
Show file tree
Hide file tree
Showing 10 changed files with 1,018 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,7 +18,7 @@
package org.apache.hadoop.hbase.client;

import static java.util.stream.Collectors.toList;

import io.opentelemetry.context.Context;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,7 +32,6 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

/**
Expand Down Expand Up @@ -280,26 +279,27 @@ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> st
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
final Context context = Context.current();
CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {

@Override
public void onRegionComplete(RegionInfo region, R resp) {
pool.execute(() -> callback.onRegionComplete(region, resp));
pool.execute(context.wrap(() -> callback.onRegionComplete(region, resp)));
}

@Override
public void onRegionError(RegionInfo region, Throwable error) {
pool.execute(() -> callback.onRegionError(region, error));
pool.execute(context.wrap(() -> callback.onRegionError(region, error)));
}

@Override
public void onComplete() {
pool.execute(() -> callback.onComplete());
pool.execute(context.wrap(callback::onComplete));
}

@Override
public void onError(Throwable error) {
pool.execute(() -> callback.onError(error));
pool.execute(context.wrap(() -> callback.onError(error)));
}
};
CoprocessorServiceBuilder<S, R> builder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFutures;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -51,17 +52,16 @@
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
Expand Down Expand Up @@ -755,14 +755,22 @@ private <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> s
ServiceCaller<S, R> callable, RegionInfo region, byte[] row) {
RegionCoprocessorRpcChannelImpl channel = new RegionCoprocessorRpcChannelImpl(conn, tableName,
region, row, rpcTimeoutNs, operationTimeoutNs);
final Span span = Span.current();
S stub = stubMaker.apply(channel);
CompletableFuture<R> future = new CompletableFuture<>();
ClientCoprocessorRpcController controller = new ClientCoprocessorRpcController();
callable.call(stub, controller, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
try (Scope ignored = span.makeCurrent()) {
if (controller.failed()) {
final Throwable failure = controller.getFailed();
future.completeExceptionally(failure);
TraceUtil.setError(span, failure);
} else {
future.complete(resp);
span.setStatus(StatusCode.OK);
}
} finally {
span.end();
}
});
return future;
Expand Down Expand Up @@ -795,8 +803,11 @@ private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, List<HRegionLocation> locs,
byte[] endKey, boolean endKeyInclusive, AtomicBoolean locateFinished,
AtomicInteger unfinishedRequest, HRegionLocation loc, Throwable error) {
final Span span = Span.current();
if (error != null) {
callback.onError(error);
TraceUtil.setError(span, error);
span.end();
return;
}
unfinishedRequest.incrementAndGet();
Expand All @@ -807,17 +818,23 @@ private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
addListener(
conn.getLocator().getRegionLocation(tableName, region.getEndKey(), RegionLocateType.CURRENT,
operationTimeoutNs),
(l, e) -> onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e));
(l, e) -> {
try (Scope ignored = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, locs, endKey, endKeyInclusive,
locateFinished, unfinishedRequest, l, e);
}
});
}
addListener(coprocessorService(stubMaker, callable, region, region.getStartKey()), (r, e) -> {
if (e != null) {
callback.onRegionError(region, e);
} else {
callback.onRegionComplete(region, r);
}
if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
callback.onComplete();
try (Scope ignored = span.makeCurrent()) {
if (e != null) {
callback.onRegionError(region, e);
} else {
callback.onRegionComplete(region, r);
}
if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
callback.onComplete();
}
}
});
}
Expand Down Expand Up @@ -868,10 +885,22 @@ public CoprocessorServiceBuilderImpl<S, R> toRow(byte[] endKey, boolean inclusiv

@Override
public void execute() {
addListener(conn.getLocator().getRegionLocation(tableName, startKey,
startKeyInclusive ? RegionLocateType.CURRENT : RegionLocateType.AFTER, operationTimeoutNs),
(loc, error) -> onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error));
final Span span = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.COPROC_EXEC)
.build();
try (Scope ignored = span.makeCurrent()) {
final RegionLocateType regionLocateType = startKeyInclusive
? RegionLocateType.CURRENT
: RegionLocateType.AFTER;
final CompletableFuture<HRegionLocation> future = conn.getLocator()
.getRegionLocation(tableName, startKey, regionLocateType, operationTimeoutNs);
addListener(future, (loc, error) -> {
try (Scope ignored1 = span.makeCurrent()) {
onLocateComplete(stubMaker, callable, callback, new ArrayList<>(), endKey,
endKeyInclusive, new AtomicBoolean(false), new AtomicInteger(0), loc, error);
}
});
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -19,7 +19,8 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.setCoprocessorError;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,13 +31,11 @@
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
Expand Down Expand Up @@ -74,6 +73,7 @@ class RegionCoprocessorRpcChannelImpl implements RpcChannel {
private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message request,
Message responsePrototype, HBaseRpcController controller, HRegionLocation loc,
ClientService.Interface stub) {
final Context context = Context.current();
CompletableFuture<Message> future = new CompletableFuture<>();
if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), region.getRegionName())) {
future.completeExceptionally(new DoNotRetryIOException("Region name is changed, expected " +
Expand All @@ -82,39 +82,43 @@ private CompletableFuture<Message> rpcCall(MethodDescriptor method, Message requ
}
CoprocessorServiceRequest csr = CoprocessorRpcUtils.getCoprocessorServiceRequest(method,
request, row, loc.getRegion().getRegionName());
stub.execService(controller, csr,
new org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback<CoprocessorServiceResponse>() {

@Override
public void run(CoprocessorServiceResponse resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
lastRegion = resp.getRegion().getValue().toByteArray();
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
stub.execService(controller, csr, resp -> {
try (Scope ignored = context.makeCurrent()) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
lastRegion = resp.getRegion().getValue().toByteArray();
try {
future.complete(CoprocessorRpcUtils.getResponse(resp, responsePrototype));
} catch (IOException e) {
future.completeExceptionally(e);
}
}
});
}
});
return future;
}

@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
final Context context = Context.current();
addListener(
conn.callerFactory.<Message> single().table(tableName).row(row)
.locateType(RegionLocateType.CURRENT).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.action((c, l, s) -> rpcCall(method, request, responsePrototype, c, l, s)).call(),
.action((c, l, s) -> {
try (Scope ignored = context.makeCurrent()) {
return rpcCall(method, request, responsePrototype, c, l, s);
}
}).call(),
(r, e) -> {
if (e != null) {
setCoprocessorError(controller, e);
try (Scope ignored = context.makeCurrent()) {
if (e != null) {
setCoprocessorError(controller, e);
}
done.run(r);
}
done.run(r);
});
}

Expand Down
Loading

0 comments on commit 36a5f86

Please sign in to comment.