Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: integrate OpenCensus tracing into the bigtable data client #4493

Merged
merged 12 commits into from
Feb 26, 2019
60 changes: 60 additions & 0 deletions google-cloud-clients/google-cloud-bigtable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,66 @@ try {
}
```

## Opencensus Tracing

Cloud Bigtable client supports [Opencensus Tracing](https://opencensus.io/tracing/),
which gives insight into the client internals and aids in debugging production issues.
By default, the functionality is disabled. To enable, you need to add a couple of
dependencies and configure an exporter. For example to enable tracing using
[Google Stackdriver](https://cloud.google.com/trace/docs/):

[//]: # (TODO: figure out how to keep opencensus version in sync with pom.xml)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to ensure that the opencensus versions in this readme stay up to date with google-cloud-clients/pom.xml properties. I thought about using version markers, but those seem to only be used for internal artifacts. It would be awesome if version.txt could be generalized to manage static external versions as well.


If you are using Maven, add this to your pom.xml file
```xml
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<version>0.18.0</version>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-exporter-trace-stackdriver</artifactId>
<version>0.18.0</version>
</dependency>
```
If you are using Gradle, add this to your dependencies
```Groovy
compile 'io.opencensus:opencensus-impl:0.18.0'
compile 'io.opencensus:opencensus-exporter-trace-stackdriver:0.18.0'
```
If you are using SBT, add this to your dependencies
```Scala
libraryDependencies += "io.opencensus" % "opencensus-impl" % "0.18.0"
libraryDependencies += "io.opencensus" % "opencensus-exporter-trace-stackdriver" % "0.18.0"
```

Then at the start of your application configure the exporter:

```java
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;

StackdriverTraceExporter.createAndRegister(
StackdriverTraceConfiguration.builder()
.setProjectId("YOUR-PROJECT_ID")
.build());
```

By default traces are [sampled](https://opencensus.io/tracing/sampling) at a rate of about 1/10,000.
You can configure a higher rate by updating the active tracing params:

```java
import io.opencensus.trace.Tracing;
import io.opencensus.trace.samplers.Samplers;

Tracing.getTraceConfig().updateActiveTraceParams(
Tracing.getTraceConfig().getActiveTraceParams().toBuilder()
.setSampler(Samplers.probabilitySampler(0.01))
.build()
);
```

## Troubleshooting

To get help, follow the instructions in the [shared Troubleshooting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.rpc.BatchingCallSettings;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.ServerStreamingCallSettings;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.api.gax.tracing.SpanName;
import com.google.api.gax.tracing.TracedBatchingCallable;
import com.google.api.gax.tracing.TracedServerStreamingCallable;
import com.google.api.gax.tracing.TracedUnaryCallable;
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.SampleRowKeysRequest;
Expand All @@ -50,6 +54,7 @@
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.RowMergingCallable;
import com.google.cloud.bigtable.gaxx.retrying.ApiResultRetryAlgorithm;
import com.google.cloud.bigtable.gaxx.tracing.WrappedTracerFactory;
import java.io.IOException;
import java.util.List;
import org.threeten.bp.Duration;
Expand All @@ -68,6 +73,9 @@
*/
@InternalApi
public class EnhancedBigtableStub implements AutoCloseable {
private static final String TRACING_OUTER_CLIENT_NAME = "Bigtable";
private static final String TRACING_INNER_CLIENT_NAME = "BaseBigtable";

private final EnhancedBigtableStubSettings settings;
private final GrpcBigtableStub stub;
private final ClientContext clientContext;
Expand All @@ -92,7 +100,10 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
.setCredentialsProvider(settings.getCredentialsProvider())
.setHeaderProvider(settings.getHeaderProvider())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval());
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
// Force the base stub to use a different TracerFactory
.setTracerFactory(
new WrappedTracerFactory(settings.getTracerFactory(), TRACING_INNER_CLIENT_NAME));

// ReadRow retries are handled in the overlay: disable retries in the base layer (but make
// sure to preserve the exception callable settings).
Expand Down Expand Up @@ -140,6 +151,9 @@ public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
ClientContext clientContext = ClientContext.create(baseSettings);
GrpcBigtableStub stub = new GrpcBigtableStub(baseSettings, clientContext);

// Make sure to keep the original tracer factory for the outer client.
clientContext = clientContext.toBuilder().setTracerFactory(settings.getTracerFactory()).build();

return new EnhancedBigtableStub(settings, clientContext, stub);
}

Expand Down Expand Up @@ -247,15 +261,8 @@ private <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
FilterMarkerRowsCallable<RowT> filtering =
new FilterMarkerRowsCallable<>(retrying2, rowAdapter);

ServerStreamingCallable<ReadRowsRequest, RowT> withContext =
filtering.withDefaultCallContext(clientContext.getDefaultCallContext());

// NOTE: Ideally `withDefaultCallContext` should be the outer-most callable, however the
// ReadRowsUserCallable overrides the first() method. This override would be lost if
// ReadRowsUserCallable is wrapped by another callable. At some point in the future,
// gax-java should allow preserving these kind of overrides through callable chains, at which
// point this should be re-ordered.
return new ReadRowsUserCallable<>(withContext, requestContext);
return createUserFacingServerStreamingCallable(
"ReadRows", new ReadRowsUserCallable<>(filtering, requestContext));
}

/**
Expand All @@ -276,10 +283,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
UnaryCallable<SampleRowKeysRequest, List<SampleRowKeysResponse>> retryable =
Callables.retrying(spoolable, settings.sampleRowKeysSettings(), clientContext);

UnaryCallable<String, List<KeyOffset>> userFacing =
new SampleRowKeysCallable(retryable, requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"SampleRowKeys", new SampleRowKeysCallable(retryable, requestContext));
}

/**
Expand All @@ -290,9 +295,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
* </ul>
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
MutateRowCallable userFacing = new MutateRowCallable(stub.mutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"MutateRow", new MutateRowCallable(stub.mutateRowCallable(), requestContext));
}

/**
Expand All @@ -311,7 +315,9 @@ private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
*/
private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
UnaryCallable<MutateRowsRequest, Void> baseCallable = createMutateRowsBaseCallable();
return new BulkMutateRowsUserFacingCallable(baseCallable, requestContext);

return createUserFacingUnaryCallable(
"BulkMutateRows", new BulkMutateRowsUserFacingCallable(baseCallable, requestContext));
}

/**
Expand All @@ -338,8 +344,17 @@ private UnaryCallable<RowMutation, Void> createBulkMutateRowsBatchingCallable()
BatchingCallSettings.newBuilder(new MutateRowsBatchingDescriptor())
.setBatchingSettings(settings.bulkMutateRowsSettings().getBatchingSettings());

// This is a special case, the tracing starts after the batching, so we can't use
// createUserFacingUnaryCallable
TracedBatchingCallable<MutateRowsRequest, Void> traced =
new TracedBatchingCallable<>(
baseCallable,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, "BulkMutateRows"),
batchingCallSettings.getBatchingDescriptor());

UnaryCallable<MutateRowsRequest, Void> batching =
Callables.batching(baseCallable, batchingCallSettings.build(), clientContext);
Callables.batching(traced, batchingCallSettings.build(), clientContext);

MutateRowsUserFacingCallable userFacing =
new MutateRowsUserFacingCallable(batching, requestContext);
Expand All @@ -359,7 +374,7 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
new ApiResultRetryAlgorithm<Void>(),
new ExponentialRetryAlgorithm(
settings.bulkMutateRowsSettings().getRetrySettings(), clientContext.getClock()));
RetryingExecutor<Void> retryingExecutor =
RetryingExecutorWithContext<Void> retryingExecutor =
new ScheduledRetryingExecutor<>(retryAlgorithm, clientContext.getExecutor());

return new MutateRowsRetryingCallable(
Expand All @@ -378,10 +393,9 @@ private UnaryCallable<MutateRowsRequest, Void> createMutateRowsBaseCallable() {
* </ul>
*/
private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCallable() {
CheckAndMutateRowCallable userFacing =
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext);

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
return createUserFacingUnaryCallable(
"CheckAndMutateRow",
new CheckAndMutateRowCallable(stub.checkAndMutateRowCallable(), requestContext));
}

/**
Expand All @@ -394,10 +408,42 @@ private UnaryCallable<ConditionalRowMutation, Boolean> createCheckAndMutateRowCa
* </ul>
*/
private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable() {
ReadModifyWriteRowCallable userFacing =
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext);
return createUserFacingUnaryCallable(
"ReadModifyWriteRow",
new ReadModifyWriteRowCallable(stub.readModifyWriteRowCallable(), requestContext));
}

return userFacing.withDefaultCallContext(clientContext.getDefaultCallContext());
/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
*/
private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacingUnaryCallable(
String methodName, UnaryCallable<RequestT, ResponseT> inner) {

UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(
inner,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
* Wraps a callable chain in a user presentable callable that will inject the default call context
* and trace the call.
*/
private <RequestT, ResponseT>
ServerStreamingCallable<RequestT, ResponseT> createUserFacingServerStreamingCallable(
String methodName, ServerStreamingCallable<RequestT, ResponseT> inner) {

ServerStreamingCallable<RequestT, ResponseT> traced =
new TracedServerStreamingCallable<>(
inner,
clientContext.getTracerFactory(),
SpanName.of(TRACING_OUTER_CLIENT_NAME, methodName));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
}
// </editor-fold>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.core.GoogleCredentialsProvider;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.BatchingCallSettings;
Expand All @@ -28,6 +30,7 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.api.gax.tracing.OpencensusTracerFactory;
import com.google.cloud.bigtable.data.v2.internal.DummyBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation;
import com.google.cloud.bigtable.data.v2.models.KeyOffset;
Expand All @@ -37,6 +40,7 @@
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -268,6 +272,13 @@ private Builder() {
setStreamWatchdogCheckInterval(baseDefaults.getStreamWatchdogCheckInterval());
setStreamWatchdogProvider(baseDefaults.getStreamWatchdogProvider());

setTracerFactory(
new OpencensusTracerFactory(
ImmutableMap.of(
"gax", GaxGrpcProperties.getGaxGrpcVersion(),
"grpc", GaxGrpcProperties.getGrpcVersion(),
"gapic", GaxProperties.getLibraryVersion(EnhancedBigtableStubSettings.class))));

// Per-method settings using baseSettings for defaults.
readRowsSettings = ServerStreamingCallSettings.newBuilder();
readRowsSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public Void call() {
return null;
}

callContext
.getTracer()
.attemptStarted(externalFuture.getAttemptSettings().getOverallAttemptCount());

// Make the actual call
ApiFuture<List<MutateRowsResponse>> innerFuture =
innerCallable.futureCall(currentRequest, currentCallContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.mutaterows;

import com.google.api.core.InternalApi;
import com.google.api.gax.retrying.RetryingExecutor;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.RetryingFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStreamingCallable;
Expand All @@ -42,13 +42,13 @@
public class MutateRowsRetryingCallable extends UnaryCallable<MutateRowsRequest, Void> {
private final ApiCallContext callContextPrototype;
private final ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable;
private final RetryingExecutor<Void> executor;
private final RetryingExecutorWithContext<Void> executor;
private final ImmutableSet<Code> retryCodes;

public MutateRowsRetryingCallable(
@Nonnull ApiCallContext callContextPrototype,
@Nonnull ServerStreamingCallable<MutateRowsRequest, MutateRowsResponse> callable,
@Nonnull RetryingExecutor<Void> executor,
@Nonnull RetryingExecutorWithContext<Void> executor,
@Nonnull Set<StatusCode.Code> retryCodes) {
this.callContextPrototype = Preconditions.checkNotNull(callContextPrototype);
this.callable = Preconditions.checkNotNull(callable);
Expand All @@ -62,7 +62,7 @@ public RetryingFuture<Void> futureCall(MutateRowsRequest request, ApiCallContext
MutateRowsAttemptCallable retryCallable =
new MutateRowsAttemptCallable(callable.all(), request, context, retryCodes);

RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable);
RetryingFuture<Void> retryingFuture = executor.createFuture(retryCallable, context);
retryCallable.setExternalFuture(retryingFuture);
retryCallable.call();

Expand Down
Loading