Skip to content

Commit

Permalink
Merge pull request #27100 from radcortez/fix-24395
Browse files Browse the repository at this point in the history
Propagate OpenTelemetry context to Blocking gRPC services
  • Loading branch information
radcortez authored Aug 11, 2022
2 parents 32d1c1a + b331569 commit 5ba07b2
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.opentelemetry.deployment;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_IP;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT;
Expand Down Expand Up @@ -27,7 +28,6 @@
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
Expand All @@ -38,6 +38,7 @@
import io.quarkus.opentelemetry.deployment.MutinyGreeterGrpc.MutinyGreeterStub;
import io.quarkus.opentelemetry.deployment.MutinyStreamingGrpc.MutinyStreamingStub;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;

public class GrpcOpenTelemetryTest {
Expand All @@ -50,7 +51,7 @@ public class GrpcOpenTelemetryTest {
Greeter.class, GreeterBean.class, GreeterClient.class,
HelloProto.class, HelloRequest.class, HelloRequestOrBuilder.class,
HelloReply.class, HelloReplyOrBuilder.class)
.addClasses(StreamService.class)
.addClasses(StreamingService.class)
.addClasses(StreamingGrpc.class, MutinyStreamingGrpc.class,
Streaming.class, StreamingBean.class, StreamingClient.class,
StreamingProto.class, Item.class, ItemOrBuilder.class))
Expand Down Expand Up @@ -85,7 +86,7 @@ void grpc() {
SpanData internal = spans.get(0);
assertEquals("span.internal", internal.getName());
assertEquals(INTERNAL, internal.getKind());
assertEquals("value", internal.getAttributes().get(AttributeKey.stringKey("grpc.internal")));
assertEquals("value", internal.getAttributes().get(stringKey("grpc.internal")));

SpanData server = spans.get(1);
assertEquals("helloworld.Greeter/SayHello", server.getName());
Expand Down Expand Up @@ -200,7 +201,7 @@ public String hello(String name) {
MutinyStreamingStub streamingStub;

@Test
void stream() {
void streaming() {
Multi<Item> request = Multi.createFrom().items(item("Goku"), item("Vegeta"), item("Piccolo"), item("Beerus"),
item("Whis"));
Multi<Item> response = streamingStub.pipe(request);
Expand All @@ -225,6 +226,7 @@ void stream() {
assertNotNull(server.getAttributes().get(NET_PEER_IP));
assertNotNull(server.getAttributes().get(NET_PEER_PORT));
assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT));
assertEquals("true", server.getAttributes().get(stringKey("grpc.service.propagated")));

SpanData client = spans.get(1);
assertEquals("streaming.Streaming/Pipe", client.getName());
Expand All @@ -237,10 +239,57 @@ void stream() {
assertEquals(server.getTraceId(), client.getTraceId());
}

@Test
void streamingBlocking() {
Multi<Item> request = Multi.createFrom().items(item("Goku"), item("Vegeta"), item("Piccolo"), item("Beerus"),
item("Whis"));
Multi<Item> response = streamingStub.pipeBlocking(request);

List<String> items = response.map(Item::getMessage).collect().asList().await().atMost(Duration.ofSeconds(5));
assertTrue(items.contains("Hello Goku"));
assertTrue(items.contains("Hello Vegeta"));
assertTrue(items.contains("Hello Piccolo"));
assertTrue(items.contains("Hello Beerus"));
assertTrue(items.contains("Hello Whis"));

List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertEquals(2, spans.size());

SpanData server = spans.get(0);
assertEquals("streaming.Streaming/PipeBlocking", server.getName());
assertEquals(SpanKind.SERVER, server.getKind());
assertEquals("grpc", server.getAttributes().get(RPC_SYSTEM));
assertEquals("streaming.Streaming", server.getAttributes().get(RPC_SERVICE));
assertEquals("PipeBlocking", server.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE));
assertNotNull(server.getAttributes().get(NET_PEER_IP));
assertNotNull(server.getAttributes().get(NET_PEER_PORT));
assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT));
assertEquals("true", server.getAttributes().get(stringKey("grpc.service.propagated.blocking")));

SpanData client = spans.get(1);
assertEquals("streaming.Streaming/PipeBlocking", client.getName());
assertEquals(SpanKind.CLIENT, client.getKind());
assertEquals("grpc", client.getAttributes().get(RPC_SYSTEM));
assertEquals("streaming.Streaming", client.getAttributes().get(RPC_SERVICE));
assertEquals("PipeBlocking", client.getAttributes().get(RPC_METHOD));
assertEquals(Status.Code.OK.value(), client.getAttributes().get(RPC_GRPC_STATUS_CODE));

assertEquals(server.getTraceId(), client.getTraceId());
}

@GrpcService
public static class StreamService implements Streaming {
public static class StreamingService implements Streaming {
@Override
public Multi<Item> pipe(final Multi<Item> request) {
Span.current().setAttribute("grpc.service.propagated", "true");
return request.onItem().transform(item -> item("Hello " + item.getMessage()));
}

@Blocking
@Override
public Multi<Item> pipeBlocking(final Multi<Item> request) {
Span.current().setAttribute("grpc.service.propagated.blocking", "true");
return request.onItem().transform(item -> item("Hello " + item.getMessage()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package streaming;

service Streaming {
rpc Pipe(stream Item) returns (stream Item) {}
rpc PipeBlocking(stream Item) returns (stream Item) {}
}

message Item {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest);
if (shouldStart) {
Context spanContext = instrumenter.start(parentContext, grpcRequest);
try (Scope ignored = spanContext.makeCurrent()) {
TracingServerCall<ReqT, RespT> tracingServerCall = new TracingServerCall<>(call, spanContext, grpcRequest);
return new TracingServerCallListener<>(next.startCall(tracingServerCall, headers), spanContext, grpcRequest);
}
Scope scope = spanContext.makeCurrent();
TracingServerCall<ReqT, RespT> tracingServerCall = new TracingServerCall<>(call, spanContext, scope, grpcRequest);
return new TracingServerCallListener<>(next.startCall(tracingServerCall, headers), spanContext, scope, grpcRequest);
}

return next.startCall(call, headers);
Expand Down Expand Up @@ -99,65 +98,87 @@ public String get(final GrpcRequest carrier, final String key) {

private class TracingServerCallListener<ReqT> extends SimpleForwardingServerCallListener<ReqT> {
private final Context spanContext;
private final Scope scope;
private final GrpcRequest grpcRequest;

protected TracingServerCallListener(final ServerCall.Listener<ReqT> delegate, final Context spanContext,
protected TracingServerCallListener(
final ServerCall.Listener<ReqT> delegate,
final Context spanContext,
final Scope scope,
final GrpcRequest grpcRequest) {

super(delegate);
this.scope = scope;
this.spanContext = spanContext;
this.grpcRequest = grpcRequest;
}

@Override
public void onHalfClose() {
try (Scope ignored = spanContext.makeCurrent()) {
try {
super.onHalfClose();
} catch (Exception e) {
instrumenter.end(spanContext, grpcRequest, null, e);
try (scope) {
instrumenter.end(spanContext, grpcRequest, null, e);
}
throw e;
}
}

@Override
public void onCancel() {
try (Scope ignored = spanContext.makeCurrent()) {
try {
super.onCancel();
instrumenter.end(spanContext, grpcRequest, Status.CANCELLED, null);
} catch (Exception e) {
instrumenter.end(spanContext, grpcRequest, null, e);
try (scope) {
instrumenter.end(spanContext, grpcRequest, null, e);
}
throw e;
}
try (scope) {
instrumenter.end(spanContext, grpcRequest, Status.CANCELLED, null);
}
}

@Override
public void onComplete() {
try (Scope ignored = spanContext.makeCurrent()) {
try {
super.onComplete();
} catch (Exception e) {
instrumenter.end(spanContext, grpcRequest, null, e);
try (scope) {
instrumenter.end(spanContext, grpcRequest, null, e);
}
throw e;
}
}

@Override
public void onReady() {
try (Scope ignored = spanContext.makeCurrent()) {
try {
super.onReady();
} catch (Exception e) {
instrumenter.end(spanContext, grpcRequest, null, e);
try (scope) {
instrumenter.end(spanContext, grpcRequest, null, e);
}
throw e;
}
}
}

private class TracingServerCall<ReqT, RespT> extends SimpleForwardingServerCall<ReqT, RespT> {
private final Context spanContext;
private final Scope scope;
private final GrpcRequest grpcRequest;

public TracingServerCall(final ServerCall<ReqT, RespT> delegate, final Context spanContext,
public TracingServerCall(
final ServerCall<ReqT, RespT> delegate,
final Context spanContext,
final Scope scope,
final GrpcRequest grpcRequest) {

super(delegate);
this.spanContext = spanContext;
this.scope = scope;
this.grpcRequest = grpcRequest;
}

Expand All @@ -166,10 +187,14 @@ public void close(final Status status, final Metadata trailers) {
try {
super.close(status, trailers);
} catch (Exception e) {
instrumenter.end(spanContext, grpcRequest, null, e);
try (scope) {
instrumenter.end(spanContext, grpcRequest, null, e);
}
throw e;
}
instrumenter.end(spanContext, grpcRequest, status, status.getCause());
try (scope) {
instrumenter.end(spanContext, grpcRequest, status, status.getCause());
}
}
}
}

0 comments on commit 5ba07b2

Please sign in to comment.