From aca80bee4085ed6bdf00f65702f49950c42ad4dd Mon Sep 17 00:00:00 2001 From: ET Date: Fri, 3 Jun 2022 02:51:12 -0700 Subject: [PATCH] Make gRPC metadata available to AttributeExtractors (#6125) * Allow GrpcTelemetryBuilder to be customized to add Extractors, etc * fixup! Allow GrpcTelemetryBuilder to be customized to add Extractors, etc * Call GrpcRequest.setMetadata() in TracingClientInterceptor.start() * stop spotless from complaining * Checkstyle satisfaction * Spotless * Suggestions (#6) * Suggestions * Apply suggestions from code review Co-authored-by: Trask Stalnaker Co-authored-by: Trask Stalnaker --- .../grpc/v1_6/GrpcRequest.java | 7 +- .../grpc/v1_6/TracingClientInterceptor.java | 2 + .../instrumentation/grpc/v1_6/GrpcTest.java | 117 ++++++++++++++++++ .../grpc/v1_6/AbstractGrpcTest.java | 2 +- 4 files changed, 126 insertions(+), 2 deletions(-) diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java index 63112168f4a4..dd1a49936a73 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcRequest.java @@ -13,7 +13,8 @@ public final class GrpcRequest { private final MethodDescriptor method; - @Nullable private final Metadata metadata; + + @Nullable private volatile Metadata metadata; @Nullable private volatile SocketAddress remoteAddress; @@ -35,6 +36,10 @@ public Metadata getMetadata() { return metadata; } + void setMetadata(Metadata metadata) { + this.metadata = metadata; + } + @Nullable public SocketAddress getRemoteAddress() { return remoteAddress; diff --git a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java index 45ce8c788ca9..c2ee83b329b3 100644 --- a/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java +++ b/instrumentation/grpc-1.6/library/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/TracingClientInterceptor.java @@ -93,6 +93,8 @@ final class TracingClientCall @Override public void start(Listener responseListener, Metadata headers) { propagators.getTextMapPropagator().inject(context, headers, MetadataSetter.INSTANCE); + // store metadata so that it can be used by custom AttributesExtractors + request.setMetadata(headers); try (Scope ignored = context.makeCurrent()) { super.start( new TracingClientCallListener(responseListener, parentContext, context, request), diff --git a/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java index 2dcd78608c86..cada74856619 100644 --- a/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java +++ b/instrumentation/grpc-1.6/library/src/test/java/io/opentelemetry/instrumentation/grpc/v1_6/GrpcTest.java @@ -5,10 +5,28 @@ package io.opentelemetry.instrumentation.grpc.v1_6; +import example.GreeterGrpc; +import example.Helloworld; +import io.grpc.BindableService; +import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.Metadata; +import io.grpc.Server; import io.grpc.ServerBuilder; +import io.grpc.Status; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; class GrpcTest extends AbstractGrpcTest { @@ -16,6 +34,11 @@ class GrpcTest extends AbstractGrpcTest { @RegisterExtension static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + private static final AttributeKey CUSTOM_KEY = AttributeKey.stringKey("customKey"); + + private static final Metadata.Key CUSTOM_METADATA_KEY = + Metadata.Key.of("customMetadataKey", Metadata.ASCII_STRING_MARSHALLER); + @Override protected ServerBuilder configureServer(ServerBuilder server) { return server.intercept( @@ -32,4 +55,98 @@ protected ManagedChannelBuilder configureClient(ManagedChannelBuilder clie protected InstrumentationExtension testing() { return testing; } + + @Test + void metadataProvided() throws Exception { + BindableService greeter = + new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello( + Helloworld.Request req, StreamObserver responseObserver) { + Helloworld.Response reply = + Helloworld.Response.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + }; + + Server server = + ServerBuilder.forPort(0) + .addService(greeter) + .intercept( + GrpcTelemetry.builder(testing.getOpenTelemetry()) + .addAttributeExtractor(new CustomAttributesExtractor()) + .build() + .newServerInterceptor()) + .build() + .start(); + + ManagedChannel channel = + createChannel( + ManagedChannelBuilder.forAddress("localhost", server.getPort()) + .intercept( + GrpcTelemetry.builder(testing.getOpenTelemetry()) + .addAttributeExtractor(new CustomAttributesExtractor()) + .build() + .newClientInterceptor())); + + closer.add(() -> channel.shutdownNow().awaitTermination(10, TimeUnit.SECONDS)); + closer.add(() -> server.shutdownNow().awaitTermination()); + + Metadata extraMetadata = new Metadata(); + extraMetadata.put(CUSTOM_METADATA_KEY, "customValue"); + + GreeterGrpc.GreeterBlockingStub client = + GreeterGrpc.newBlockingStub(channel) + .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(extraMetadata)); + + Helloworld.Response response = + testing() + .runWithSpan( + "parent", + () -> client.sayHello(Helloworld.Request.newBuilder().setName("test").build())); + + OpenTelemetryAssertions.assertThat(response.getMessage()).isEqualTo("Hello test"); + + testing() + .waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.CLIENT) + .hasParent(trace.getSpan(0)) + .hasAttribute(CUSTOM_KEY, "customValue"), + span -> + span.hasName("example.Greeter/SayHello") + .hasKind(SpanKind.SERVER) + .hasParent(trace.getSpan(1)) + .hasAttribute(CUSTOM_KEY, "customValue"))); + } + + private static class CustomAttributesExtractor + implements AttributesExtractor { + + @Override + public void onStart( + AttributesBuilder attributes, Context parentContext, GrpcRequest grpcRequest) {} + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + GrpcRequest grpcRequest, + @Nullable Status status, + @Nullable Throwable error) { + + Metadata metadata = grpcRequest.getMetadata(); + if (metadata != null && metadata.containsKey(CUSTOM_METADATA_KEY)) { + String value = metadata.get(CUSTOM_METADATA_KEY); + if (value != null) { + attributes.put(CUSTOM_KEY, value); + } + } + } + } } diff --git a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java index a6144c701598..dd87b7db9ecc 100644 --- a/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java +++ b/instrumentation/grpc-1.6/testing/src/main/java/io/opentelemetry/instrumentation/grpc/v1_6/AbstractGrpcTest.java @@ -73,7 +73,7 @@ public abstract class AbstractGrpcTest { protected abstract InstrumentationExtension testing(); - private final Queue> closer = new ConcurrentLinkedQueue<>(); + protected final Queue> closer = new ConcurrentLinkedQueue<>(); @AfterEach void tearDown() throws Throwable {