diff --git a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 579da6797ced..b997103dbaf2 100644 --- a/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-clients/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -1063,7 +1063,7 @@ ResultSet executeQueryInternalWithOptions( final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) { + new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY, span) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); @@ -1176,7 +1176,7 @@ ResultSet readInternalWithOptions( final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) { + new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ, span) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); @@ -1426,7 +1426,7 @@ void commit() { mutations = null; } final CommitRequest commitRequest = builder.build(); - Span opSpan = tracer.spanBuilder(COMMIT).startSpan(); + Span opSpan = tracer.spanBuilderWithExplicitParent(COMMIT, span).startSpan(); try (Scope s = tracer.withSpan(opSpan)) { CommitResponse commitResponse = runWithRetries( @@ -2452,20 +2452,20 @@ abstract static class ResumableStreamIterator extends AbstractIterator= 0); this.maxBufferSize = maxBufferSize; - this.span = tracer.spanBuilder(streamName).startSpan(); + this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan(); } abstract CloseableIterator startStream(@Nullable ByteString resumeToken); @Override public void close(@Nullable String message) { - span.end(); if (stream != null) { stream.close(message); } + span.end(); } @Override @@ -2478,7 +2478,11 @@ protected PartialResultSet computeNext() { ImmutableMap.of("ResumeToken", AttributeValue.stringAttributeValue( resumeToken == null ? "null" : resumeToken.toStringUtf8()))); - stream = checkNotNull(startStream(resumeToken)); + try (Scope s = tracer.withSpan(span)) { + // When start a new stream set the Span as current to make the gRPC Span a child of + // this Span. + stream = checkNotNull(startStream(resumeToken)); + } } // Buffer contains items up to a resume token or has reached capacity: flush. if (!buffer.isEmpty() diff --git a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index 79ecf761650f..112ed1103cf9 100644 --- a/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-clients/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -98,7 +98,7 @@ public void setUp() { private void initWithLimit(int maxBufferSize) { iterator = - new SpannerImpl.ResumableStreamIterator(maxBufferSize, "") { + new SpannerImpl.ResumableStreamIterator(maxBufferSize, "", null) { @Override SpannerImpl.CloseableIterator startStream( @Nullable ByteString resumeToken) {