Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Adds first E2E Client-side Tracing Test Using OpenTelemetry SDK with Cloud Trace Exporter and Cloud Trace v1 reader. #1622

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8163bb4
feat: Add FirestoreOpenTelemetryOptions to FirestoreOptions.
ehsannas Jan 16, 2024
27d4bc1
feat: Add com.google.cloud.firestore.telemetry package.
ehsannas Jan 18, 2024
77107e2
fix: Remove OpenCensus tracing code.
ehsannas Jan 10, 2024
ee18489
feat: tracing for aggregate queries, bulkwriter, partition queries, a…
ehsannas Feb 27, 2024
2982f05
feat: trace instrumentation for DocumentReference methods.
ehsannas Feb 27, 2024
1f4cb5d
feat: trace instrumentation for queries and transactions.
ehsannas Feb 27, 2024
800fb79
Adding first e2e client-tracing test w/ Custom Root Span
jimit-j-shah Mar 9, 2024
160b0e0
test: Adding first e2e client-tracing test w/ Custom Root Span
jimit-j-shah Mar 9, 2024
159026b
Merge remote-tracking branch 'origin/jimit/tracing-test1' into jimit/…
jimit-j-shah Mar 18, 2024
1d4ac77
Fixing test dependencies and use default GCP testing project.
jimit-j-shah Mar 18, 2024
1b7174b
Fixing test dependencies and use default GCP testing project.
jimit-j-shah Mar 18, 2024
0b86e43
Merge remote-tracking branch 'origin/jimit/tracing-test1' into jimit/…
jimit-j-shah Mar 18, 2024
9ee3663
Fixing formatting
jimit-j-shah Mar 18, 2024
791f2af
Add aggregationQueryGet Test
jimit-j-shah Mar 18, 2024
02cecdf
Add bulkWriterCommitTrace Test
jimit-j-shah Mar 18, 2024
24f311b
Fixing running multiple-tests
jimit-j-shah Mar 18, 2024
469853f
Add partitionQuery Test
jimit-j-shah Mar 18, 2024
5812c0e
Add collectionListDocumentsTrace Test
jimit-j-shah Mar 18, 2024
52d7b4d
Add docRef*Trace Tests
jimit-j-shah Mar 19, 2024
b25bd3d
Add docRefUpdate*Trace and docRefDelete*Trace Tests
jimit-j-shah Mar 19, 2024
e84783f
Fixing Trace fetching using retries for missing or incomplete traces …
jimit-j-shah Mar 19, 2024
4485c77
Add get/query Trace Tests
jimit-j-shah Mar 19, 2024
f43f8ba
Add Transaction test
jimit-j-shah Mar 20, 2024
2520997
Added TraceContainer to be able to test transaction test-cases
jimit-j-shah Mar 26, 2024
d551032
test: Adding Transaction tests
jimit-j-shah Mar 27, 2024
ab9ef66
test: Adding Transaction tests
jimit-j-shah Mar 20, 2024
5b6172b
Merge remote-tracking branch 'origin/jimit/tracing-test1' into jimit/…
jimit-j-shah Mar 27, 2024
33dd0c6
test: Adding TestParameterInjector to run the test for global and non…
jimit-j-shah Mar 28, 2024
29d8f15
test: formatting and cleanup
jimit-j-shah Mar 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 75 additions & 8 deletions google-cloud-firestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@
<artifactId>grpc-google-cloud-firestore-v1</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-grpc-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down Expand Up @@ -91,10 +87,6 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
Expand All @@ -113,6 +105,23 @@
<artifactId>protobuf-java-util</artifactId>
</dependency>

<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.29.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>1.29.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-grpc-1.6</artifactId>
<version>1.29.0-alpha</version>
</dependency>
<!-- END OpenTelemetry -->

<!-- Test dependencies -->
<dependency>
Expand Down Expand Up @@ -173,6 +182,64 @@
<version>3.14.0</version>
<scope>test</scope>
</dependency>
<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<version>1.32.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.29.0-alpha</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-trace</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-common</artifactId>
<version>1.29.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.opentelemetry</groupId>
<artifactId>exporter-trace</artifactId>
<version>0.15.0</version>
<scope>test</scope>
</dependency>
<!-- END OpenTelemetry -->
<!-- Cloud Ops -->
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-trace-v1</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.testparameterinjector</groupId>
<artifactId>test-parameter-injector</artifactId>
<version>1.15</version>
<scope>test</scope>
</dependency>
<!-- END Cloud Ops -->
</dependencies>

<reporting>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.firestore;

import static com.google.cloud.firestore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY;

import com.google.api.core.ApiFuture;
import com.google.api.core.InternalExtensionOnly;
import com.google.api.core.SettableApiFuture;
Expand All @@ -24,7 +26,10 @@
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1.RunAggregationQueryRequest;
import com.google.firestore.v1.RunAggregationQueryResponse;
import com.google.firestore.v1.RunQueryRequest;
Expand All @@ -34,6 +39,7 @@
import com.google.firestore.v1.Value;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -59,6 +65,11 @@ public class AggregateQuery {
this.aliasMap = new HashMap<>();
}

@Nonnull
private TraceUtil getTraceUtil() {
return query.getFirestore().getOptions().getTraceUtil();
}

/** Returns the query whose aggregations will be calculated by this object. */
@Nonnull
public Query getQuery() {
Expand All @@ -77,17 +88,30 @@ public ApiFuture<AggregateQuerySnapshot> get() {

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
TraceUtil.Span span =
getTraceUtil()
.startSpan(
transactionId == null
? TraceUtil.SPAN_NAME_AGGREGATION_QUERY_GET
: TraceUtil.SPAN_NAME_TRANSACTION_GET_AGGREGATION_QUERY);
try (Scope ignored = span.makeCurrent()) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer, /* attempt= */ 0);
ApiFuture<AggregateQuerySnapshot> result = responseDeliverer.getFuture();
span.endAtFuture(result);
return result;
} catch (Exception error) {
span.end(error);
throw error;
}
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private void runQuery(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
new AggregateQueryResponseObserver(responseDeliverer, attempt);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
query.rpcContext.getClient().runAggregationQueryCallable();
query.rpcContext.streamRequest(request, responseObserver, callable);
Expand Down Expand Up @@ -138,18 +162,36 @@ private final class AggregateQueryResponseObserver

private final AggregateQueryResponseDeliverer responseDeliverer;
private StreamController streamController;
private int attempt;

AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer) {
AggregateQueryResponseObserver(AggregateQueryResponseDeliverer responseDeliverer, int attempt) {
this.responseDeliverer = responseDeliverer;
this.attempt = attempt;
}

Map<String, Object> getAttemptAttributes() {
ImmutableMap.Builder<String, Object> builder =
new ImmutableMap.Builder<String, Object>().put("isRetryAttempt", attempt > 0);
if (attempt > 0) {
builder.put("attemptNumber", attempt);
}
return builder.build();
}

@Override
public void onStart(StreamController streamController) {
getTraceUtil()
.currentSpan()
.addEvent(SPAN_NAME_RUN_AGGREGATION_QUERY + " Stream started.", getAttemptAttributes());
this.streamController = streamController;
}

@Override
public void onResponse(RunAggregationQueryResponse response) {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + " Response Received.", getAttemptAttributes());
// Close the stream to avoid it dangling, since we're not expecting any more responses.
streamController.cancel();

Expand All @@ -165,8 +207,19 @@ public void onResponse(RunAggregationQueryResponse response) {
@Override
public void onError(Throwable throwable) {
if (shouldRetry(throwable)) {
runQuery(responseDeliverer);
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Retryable Error",
Collections.singletonMap("error.message", throwable.getMessage()));

runQuery(responseDeliverer, attempt + 1);
} else {
getTraceUtil()
.currentSpan()
.addEvent(
SPAN_NAME_RUN_AGGREGATION_QUERY + ": Error",
Collections.singletonMap("error.message", throwable.getMessage()));
responseDeliverer.deliverError(throwable);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.firestore.v1.BatchWriteRequest;
import com.google.firestore.v1.BatchWriteResponse;
import io.grpc.Status;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -69,12 +66,6 @@ ApiFuture<WriteResult> wrapResult(int writeIndex) {
* <p>The writes in the batch are not applied atomically and can be applied out of order.
*/
ApiFuture<Void> bulkCommit() {
Tracing.getTracer()
.getCurrentSpan()
.addAnnotation(
TraceUtil.SPAN_NAME_BATCHWRITE,
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(getWrites().size())));

final BatchWriteRequest.Builder request = BatchWriteRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.firestore.telemetry.TraceUtil;
import com.google.cloud.firestore.telemetry.TraceUtil.Context;
import com.google.cloud.firestore.telemetry.TraceUtil.Scope;
import com.google.cloud.firestore.v1.FirestoreSettings;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -221,6 +224,8 @@ enum OperationType {
@GuardedBy("lock")
private Executor errorExecutor;

Context traceContext;

/**
* Used to track when writes are enqueued. The user handler executors cannot be changed after a
* write has been enqueued.
Expand All @@ -237,6 +242,7 @@ enum OperationType {
this.successExecutor = MoreExecutors.directExecutor();
this.errorExecutor = MoreExecutors.directExecutor();
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
this.traceContext = firestore.getOptions().getTraceUtil().currentContext();

if (!options.getThrottlingEnabled()) {
this.rateLimiter =
Expand Down Expand Up @@ -899,21 +905,32 @@ private void scheduleCurrentBatchLocked(final boolean flush) {

/** Sends the provided batch once the rate limiter does not require any delay. */
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
// Send the batch if it is does not require any delay, or schedule another attempt after the
// Send the batch if it does not require any delay, or schedule another attempt after the
// appropriate timeout.
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
if (underRateLimit) {
batch
.bulkCommit()
.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
TraceUtil.Span span =
firestore
.getOptions()
.getTraceUtil()
.startSpan(TraceUtil.SPAN_NAME_BULK_WRITER_COMMIT, traceContext)
.setAttribute("numDocuments", batch.getWrites().size());
try (Scope ignored = span.makeCurrent()) {
ApiFuture<Void> result = batch.bulkCommit();
result.addListener(
() -> {
if (flush) {
synchronized (lock) {
scheduleCurrentBatchLocked(/* flush= */ true);
}
},
bulkWriterExecutor);
}
},
bulkWriterExecutor);
span.endAtFuture(result);
} catch (Exception error) {
span.end(error);
throw error;
}
} else {
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
logger.log(Level.FINE, () -> String.format("Backing off for %d seconds", delayMs / 1000));
Expand Down
Loading