diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/GrpcOpenTelemetryTest.java b/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/GrpcOpenTelemetryTest.java index 7c092890be0b2..146a31f3d49eb 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/GrpcOpenTelemetryTest.java +++ b/extensions/opentelemetry/opentelemetry/deployment/src/test/java/io/quarkus/opentelemetry/deployment/GrpcOpenTelemetryTest.java @@ -1,5 +1,6 @@ package io.quarkus.opentelemetry.deployment; +import static io.opentelemetry.api.common.AttributeKey.stringKey; import static io.opentelemetry.api.trace.SpanKind.INTERNAL; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_IP; import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT; @@ -27,7 +28,6 @@ import io.grpc.Status; import io.grpc.stub.StreamObserver; import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; @@ -38,6 +38,7 @@ import io.quarkus.opentelemetry.deployment.MutinyGreeterGrpc.MutinyGreeterStub; import io.quarkus.opentelemetry.deployment.MutinyStreamingGrpc.MutinyStreamingStub; import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Multi; public class GrpcOpenTelemetryTest { @@ -50,7 +51,7 @@ public class GrpcOpenTelemetryTest { Greeter.class, GreeterBean.class, GreeterClient.class, HelloProto.class, HelloRequest.class, HelloRequestOrBuilder.class, HelloReply.class, HelloReplyOrBuilder.class) - .addClasses(StreamService.class) + .addClasses(StreamingService.class) .addClasses(StreamingGrpc.class, MutinyStreamingGrpc.class, Streaming.class, StreamingBean.class, StreamingClient.class, StreamingProto.class, Item.class, ItemOrBuilder.class)) @@ -85,7 +86,7 @@ void grpc() { SpanData internal = spans.get(0); assertEquals("span.internal", internal.getName()); assertEquals(INTERNAL, internal.getKind()); - assertEquals("value", internal.getAttributes().get(AttributeKey.stringKey("grpc.internal"))); + assertEquals("value", internal.getAttributes().get(stringKey("grpc.internal"))); SpanData server = spans.get(1); assertEquals("helloworld.Greeter/SayHello", server.getName()); @@ -200,7 +201,7 @@ public String hello(String name) { MutinyStreamingStub streamingStub; @Test - void stream() { + void streaming() { Multi request = Multi.createFrom().items(item("Goku"), item("Vegeta"), item("Piccolo"), item("Beerus"), item("Whis")); Multi response = streamingStub.pipe(request); @@ -225,6 +226,7 @@ void stream() { assertNotNull(server.getAttributes().get(NET_PEER_IP)); assertNotNull(server.getAttributes().get(NET_PEER_PORT)); assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT)); + assertEquals("true", server.getAttributes().get(stringKey("grpc.service.propagated"))); SpanData client = spans.get(1); assertEquals("streaming.Streaming/Pipe", client.getName()); @@ -237,10 +239,57 @@ void stream() { assertEquals(server.getTraceId(), client.getTraceId()); } + @Test + void streamingBlocking() { + Multi request = Multi.createFrom().items(item("Goku"), item("Vegeta"), item("Piccolo"), item("Beerus"), + item("Whis")); + Multi response = streamingStub.pipeBlocking(request); + + List items = response.map(Item::getMessage).collect().asList().await().atMost(Duration.ofSeconds(5)); + assertTrue(items.contains("Hello Goku")); + assertTrue(items.contains("Hello Vegeta")); + assertTrue(items.contains("Hello Piccolo")); + assertTrue(items.contains("Hello Beerus")); + assertTrue(items.contains("Hello Whis")); + + List spans = spanExporter.getFinishedSpanItems(2); + assertEquals(2, spans.size()); + + SpanData server = spans.get(0); + assertEquals("streaming.Streaming/PipeBlocking", server.getName()); + assertEquals(SpanKind.SERVER, server.getKind()); + assertEquals("grpc", server.getAttributes().get(RPC_SYSTEM)); + assertEquals("streaming.Streaming", server.getAttributes().get(RPC_SERVICE)); + assertEquals("PipeBlocking", server.getAttributes().get(RPC_METHOD)); + assertEquals(Status.Code.OK.value(), server.getAttributes().get(RPC_GRPC_STATUS_CODE)); + assertNotNull(server.getAttributes().get(NET_PEER_IP)); + assertNotNull(server.getAttributes().get(NET_PEER_PORT)); + assertEquals("ip_tcp", server.getAttributes().get(NET_TRANSPORT)); + assertEquals("true", server.getAttributes().get(stringKey("grpc.service.propagated.blocking"))); + + SpanData client = spans.get(1); + assertEquals("streaming.Streaming/PipeBlocking", client.getName()); + assertEquals(SpanKind.CLIENT, client.getKind()); + assertEquals("grpc", client.getAttributes().get(RPC_SYSTEM)); + assertEquals("streaming.Streaming", client.getAttributes().get(RPC_SERVICE)); + assertEquals("PipeBlocking", client.getAttributes().get(RPC_METHOD)); + assertEquals(Status.Code.OK.value(), client.getAttributes().get(RPC_GRPC_STATUS_CODE)); + + assertEquals(server.getTraceId(), client.getTraceId()); + } + @GrpcService - public static class StreamService implements Streaming { + public static class StreamingService implements Streaming { @Override public Multi pipe(final Multi request) { + Span.current().setAttribute("grpc.service.propagated", "true"); + return request.onItem().transform(item -> item("Hello " + item.getMessage())); + } + + @Blocking + @Override + public Multi pipeBlocking(final Multi request) { + Span.current().setAttribute("grpc.service.propagated.blocking", "true"); return request.onItem().transform(item -> item("Hello " + item.getMessage())); } } diff --git a/extensions/opentelemetry/opentelemetry/deployment/src/test/proto/streaming.proto b/extensions/opentelemetry/opentelemetry/deployment/src/test/proto/streaming.proto index c3637bc7a9115..2a143ef92e6e5 100644 --- a/extensions/opentelemetry/opentelemetry/deployment/src/test/proto/streaming.proto +++ b/extensions/opentelemetry/opentelemetry/deployment/src/test/proto/streaming.proto @@ -8,6 +8,7 @@ package streaming; service Streaming { rpc Pipe(stream Item) returns (stream Item) {} + rpc PipeBlocking(stream Item) returns (stream Item) {} } message Item { diff --git a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/grpc/GrpcTracingServerInterceptor.java b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/grpc/GrpcTracingServerInterceptor.java index 9166bd10329e4..d1cd7fd79c018 100644 --- a/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/grpc/GrpcTracingServerInterceptor.java +++ b/extensions/opentelemetry/opentelemetry/runtime/src/main/java/io/quarkus/opentelemetry/runtime/tracing/grpc/GrpcTracingServerInterceptor.java @@ -56,10 +56,9 @@ public ServerCall.Listener interceptCall( boolean shouldStart = instrumenter.shouldStart(parentContext, grpcRequest); if (shouldStart) { Context spanContext = instrumenter.start(parentContext, grpcRequest); - try (Scope ignored = spanContext.makeCurrent()) { - TracingServerCall tracingServerCall = new TracingServerCall<>(call, spanContext, grpcRequest); - return new TracingServerCallListener<>(next.startCall(tracingServerCall, headers), spanContext, grpcRequest); - } + Scope scope = spanContext.makeCurrent(); + TracingServerCall tracingServerCall = new TracingServerCall<>(call, spanContext, scope, grpcRequest); + return new TracingServerCallListener<>(next.startCall(tracingServerCall, headers), spanContext, scope, grpcRequest); } return next.startCall(call, headers); @@ -99,52 +98,68 @@ public String get(final GrpcRequest carrier, final String key) { private class TracingServerCallListener extends SimpleForwardingServerCallListener { private final Context spanContext; + private final Scope scope; private final GrpcRequest grpcRequest; - protected TracingServerCallListener(final ServerCall.Listener delegate, final Context spanContext, + protected TracingServerCallListener( + final ServerCall.Listener delegate, + final Context spanContext, + final Scope scope, final GrpcRequest grpcRequest) { + super(delegate); + this.scope = scope; this.spanContext = spanContext; this.grpcRequest = grpcRequest; } @Override public void onHalfClose() { - try (Scope ignored = spanContext.makeCurrent()) { + try { super.onHalfClose(); } catch (Exception e) { - instrumenter.end(spanContext, grpcRequest, null, e); + try (scope) { + instrumenter.end(spanContext, grpcRequest, null, e); + } throw e; } } @Override public void onCancel() { - try (Scope ignored = spanContext.makeCurrent()) { + try { super.onCancel(); - instrumenter.end(spanContext, grpcRequest, Status.CANCELLED, null); } catch (Exception e) { - instrumenter.end(spanContext, grpcRequest, null, e); + try (scope) { + instrumenter.end(spanContext, grpcRequest, null, e); + } throw e; } + try (scope) { + instrumenter.end(spanContext, grpcRequest, Status.CANCELLED, null); + } } @Override public void onComplete() { - try (Scope ignored = spanContext.makeCurrent()) { + try { super.onComplete(); } catch (Exception e) { - instrumenter.end(spanContext, grpcRequest, null, e); + try (scope) { + instrumenter.end(spanContext, grpcRequest, null, e); + } throw e; } } @Override public void onReady() { - try (Scope ignored = spanContext.makeCurrent()) { + try { super.onReady(); } catch (Exception e) { - instrumenter.end(spanContext, grpcRequest, null, e); + try (scope) { + instrumenter.end(spanContext, grpcRequest, null, e); + } throw e; } } @@ -152,12 +167,18 @@ public void onReady() { private class TracingServerCall extends SimpleForwardingServerCall { private final Context spanContext; + private final Scope scope; private final GrpcRequest grpcRequest; - public TracingServerCall(final ServerCall delegate, final Context spanContext, + public TracingServerCall( + final ServerCall delegate, + final Context spanContext, + final Scope scope, final GrpcRequest grpcRequest) { + super(delegate); this.spanContext = spanContext; + this.scope = scope; this.grpcRequest = grpcRequest; } @@ -166,10 +187,14 @@ public void close(final Status status, final Metadata trailers) { try { super.close(status, trailers); } catch (Exception e) { - instrumenter.end(spanContext, grpcRequest, null, e); + try (scope) { + instrumenter.end(spanContext, grpcRequest, null, e); + } throw e; } - instrumenter.end(spanContext, grpcRequest, status, status.getCause()); + try (scope) { + instrumenter.end(spanContext, grpcRequest, status, status.getCause()); + } } } }