Skip to content

Commit

Permalink
Fix parent/child spans relationship in Spanner. (googleapis#3690)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bogdan Drutu authored and pongad committed Sep 27, 2018
1 parent 3c9bbf2 commit 9187488
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ ResultSet executeQueryInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) {
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
Expand Down Expand Up @@ -1176,7 +1176,7 @@ ResultSet readInternalWithOptions(
final int prefetchChunks =
readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks;
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) {
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
Expand Down Expand Up @@ -1426,7 +1426,7 @@ void commit() {
mutations = null;
}
final CommitRequest commitRequest = builder.build();
Span opSpan = tracer.spanBuilder(COMMIT).startSpan();
Span opSpan = tracer.spanBuilderWithExplicitParent(COMMIT, span).startSpan();
try (Scope s = tracer.withSpan(opSpan)) {
CommitResponse commitResponse =
runWithRetries(
Expand Down Expand Up @@ -2452,20 +2452,20 @@ abstract static class ResumableStreamIterator extends AbstractIterator<PartialRe
*/
private boolean safeToRetry = true;

protected ResumableStreamIterator(int maxBufferSize, String streamName) {
protected ResumableStreamIterator(int maxBufferSize, String streamName, Span parent) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.span = tracer.spanBuilder(streamName).startSpan();
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent).startSpan();
}

abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken);

@Override
public void close(@Nullable String message) {
span.end();
if (stream != null) {
stream.close(message);
}
span.end();
}

@Override
Expand All @@ -2478,7 +2478,11 @@ protected PartialResultSet computeNext() {
ImmutableMap.of("ResumeToken",
AttributeValue.stringAttributeValue(
resumeToken == null ? "null" : resumeToken.toStringUtf8())));
stream = checkNotNull(startStream(resumeToken));
try (Scope s = tracer.withSpan(span)) {
// When start a new stream set the Span as current to make the gRPC Span a child of
// this Span.
stream = checkNotNull(startStream(resumeToken));
}
}
// Buffer contains items up to a resume token or has reached capacity: flush.
if (!buffer.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void setUp() {

private void initWithLimit(int maxBufferSize) {
iterator =
new SpannerImpl.ResumableStreamIterator(maxBufferSize, "") {
new SpannerImpl.ResumableStreamIterator(maxBufferSize, "", null) {
@Override
SpannerImpl.CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken) {
Expand Down

0 comments on commit 9187488

Please sign in to comment.