From 641eb5510046e2cb84f9c105c4dd4577fb0e5767 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Wed, 17 Nov 2021 16:37:30 +0100 Subject: [PATCH 1/5] Implement a dedicated reactor-netty 1.0 instrumentation --- .../reactor/ContextPropagationOperator.java | 2 +- .../javaagent/build.gradle.kts | 2 + .../reactornetty/v1_0/DecoratorFunctions.java | 47 ++-- .../v1_0/HttpClientInstrumentation.java | 67 +++-- .../v1_0/HttpClientRequestHeadersSetter.java | 19 ++ .../HttpResponseReceiverInstrumenter.java | 182 ++++++++++++++ .../reactornetty/v1_0/MapConnect.java | 22 -- .../reactornetty/v1_0/OnRequest.java | 20 -- .../reactornetty/v1_0/ReactorContextKeys.java | 16 ++ ...torNettyHttpClientAttributesExtractor.java | 114 +++++++++ .../ReactorNettyInstrumentationModule.java | 5 +- ...ctorNettyNetClientAttributesExtractor.java | 41 +++ .../v1_0/ReactorNettySingletons.java | 37 ++- .../v1_0/ResponseReceiverInstrumentation.java | 237 ++++++++++++++++++ .../AbstractReactorNettyHttpClientTest.groovy | 11 - .../v1_0/ReactorNettyClientSslTest.groovy | 42 ++-- .../ReactorNettyConnectionSpanTest.groovy | 33 ++- .../v1_0/ReactorNettyHttpClientTest.groovy | 11 +- ...ReactorNettyHttpClientUsingFromTest.groovy | 10 +- 19 files changed, 774 insertions(+), 144 deletions(-) create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java delete mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java delete mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java create mode 100644 instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java index 4e3e03a0a3a4..bfd6d63566e3 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java @@ -117,7 +117,7 @@ public void resetOnEachOperator() { } /** Forces Mono to run in traceContext scope. */ - static Mono runWithContext(Mono publisher, Context tracingContext) { + public static Mono runWithContext(Mono publisher, Context tracingContext) { if (!enabled) { return publisher; } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts index 122b0b58c709..2a12f952cca8 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts @@ -21,6 +21,8 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") implementation(project(":instrumentation:netty:netty-4.1-common:javaagent")) + implementation(project(":instrumentation:reactor-3.1:library")) + library("io.projectreactor.netty:reactor-netty-http:1.0.0") testInstrumentation(project(":instrumentation:reactor-netty:reactor-netty-0.9:javaagent")) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java index d53a719f70da..ab6fbf554a3c 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java @@ -5,10 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; -import io.netty.channel.Channel; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; import java.util.function.BiConsumer; import javax.annotation.Nullable; import reactor.netty.Connection; @@ -24,15 +22,19 @@ public static boolean shouldDecorate(Class callbackClass) { public static final class OnMessageDecorator implements BiConsumer { + private final BiConsumer delegate; + private final PropagatedContext propagatedContext; - public OnMessageDecorator(BiConsumer delegate) { + public OnMessageDecorator( + BiConsumer delegate, PropagatedContext propagatedContext) { this.delegate = delegate; + this.propagatedContext = propagatedContext; } @Override public void accept(M message, Connection connection) { - Context context = getChannelContext(message.currentContextView(), connection.channel()); + Context context = getChannelContext(message.currentContextView(), propagatedContext); if (context == null) { delegate.accept(message, connection); } else { @@ -45,15 +47,19 @@ public void accept(M message, Connection connection) { public static final class OnMessageErrorDecorator implements BiConsumer { + private final BiConsumer delegate; + private final PropagatedContext propagatedContext; - public OnMessageErrorDecorator(BiConsumer delegate) { + public OnMessageErrorDecorator( + BiConsumer delegate, PropagatedContext propagatedContext) { this.delegate = delegate; + this.propagatedContext = propagatedContext; } @Override public void accept(M message, Throwable throwable) { - Context context = getChannelContext(message.currentContextView(), null); + Context context = getChannelContext(message.currentContextView(), propagatedContext); if (context == null) { delegate.accept(message, throwable); } else { @@ -65,16 +71,27 @@ public void accept(M message, Throwable throwable) { } @Nullable - private static Context getChannelContext(ContextView contextView, @Nullable Channel channel) { - // try to get the client span context from the channel if it's available - if (channel != null) { - Context context = channel.attr(AttributeKeys.CLIENT_CONTEXT).get(); - if (context != null) { - return context; - } + private static Context getChannelContext( + ContextView contextView, PropagatedContext propagatedContext) { + Context context = null; + if (propagatedContext.useClientContext) { + context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null); + } + if (context == null) { + context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null); + } + return context; + } + + public enum PropagatedContext { + PARENT(false), + CLIENT(true); + + final boolean useClientContext; + + PropagatedContext(boolean useClientContext) { + this.useClientContext = useClientContext; } - // otherwise use the parent span context - return contextView.getOrDefault(MapConnect.CONTEXT_ATTRIBUTE, null); } private DecoratorFunctions() {} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java index 68a8429bb5f2..69c6c62b8fc4 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; @@ -14,13 +13,12 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.api.CallDepth; +import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; -import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; @@ -32,10 +30,6 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isStatic().and(namedOneOf("create", "newConnection", "from")), - this.getClass().getName() + "$CreateAdvice"); - // advice classes below expose current context in doOn*/doAfter* callbacks transformer.applyAdviceToMethod( isPublic() @@ -70,36 +64,22 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$OnErrorAdvice"); } - @SuppressWarnings("unused") - public static class CreateAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(HttpClient.class); - callDepth.getAndIncrement(); - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Thrown Throwable throwable, - @Advice.Return(readOnly = false) HttpClient client, - @Advice.Local("otelCallDepth") CallDepth callDepth) { - - if (callDepth.decrementAndGet() == 0 && throwable == null) { - client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); - } - } - } - @SuppressWarnings("unused") public static class OnRequestAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + BiConsumer callback, + @Advice.Origin("#m") String methodName) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageDecorator<>(callback); + // use client context after request is sent, parent context before that + PropagatedContext propagatedContext = + "doAfterRequest".equals(methodName) + ? PropagatedContext.CLIENT + : PropagatedContext.PARENT; + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext); } } } @@ -111,8 +91,10 @@ public static class OnRequestErrorAdvice { public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback); + callback = + new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } } } @@ -123,9 +105,15 @@ public static class OnResponseAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback) { + BiConsumer callback, + @Advice.Origin("#m") String methodName) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageDecorator<>(callback); + // use client context just when response status & headers are received, the parent context + // after the response is completed + PropagatedContext propagatedContext = + "doOnResponse".equals(methodName) ? PropagatedContext.CLIENT : PropagatedContext.PARENT; + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext); } } } @@ -137,8 +125,10 @@ public static class OnResponseErrorAdvice { public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback); + callback = + new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } } } @@ -152,11 +142,16 @@ public static void onEnter( BiConsumer requestCallback, @Advice.Argument(value = 1, readOnly = false) BiConsumer responseCallback) { + if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { - requestCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(requestCallback); + requestCallback = + new DecoratorFunctions.OnMessageErrorDecorator<>( + requestCallback, PropagatedContext.PARENT); } if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { - responseCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(responseCallback); + responseCallback = + new DecoratorFunctions.OnMessageErrorDecorator<>( + responseCallback, PropagatedContext.PARENT); } } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java new file mode 100644 index 000000000000..b7fecafa80d8 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.context.propagation.TextMapSetter; +import javax.annotation.Nullable; +import reactor.netty.http.client.HttpClientRequest; + +enum HttpClientRequestHeadersSetter implements TextMapSetter { + INSTANCE; + + @Override + public void set(@Nullable HttpClientRequest request, String key, String value) { + request.header(key, value); + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java new file mode 100644 index 000000000000..014d98c3e746 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -0,0 +1,182 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator; +import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; +import java.util.function.BiConsumer; +import java.util.function.Function; +import javax.annotation.Nullable; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientRequest; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.http.client.HttpClientState; + +public final class HttpResponseReceiverInstrumenter { + + // this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP + // request processing + // it should be used just before one of the response*() methods is called - at this point the HTTP + // request is no longer modifiable by the user + @Nullable + public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseReceiver receiver) { + // receiver should always be an HttpClientFinalizer, which both extends HttpClient and + // implements ResponseReceiver + if (receiver instanceof HttpClient) { + HttpClient client = (HttpClient) receiver; + HttpClientConfig config = client.configuration(); + + ContextHolder contextHolder = new ContextHolder(); + + HttpClient modified = + client + .mapConnect(new StartOperation(contextHolder, config)) + .doOnRequest(new PropagateContext(contextHolder)) + .doOnRequestError(new EndOperationWithError(contextHolder, config)) + .observe(new EndOperation(contextHolder, config)); + + // modified should always be an HttpClientFinalizer too + if (modified instanceof HttpClient.ResponseReceiver) { + return (HttpClient.ResponseReceiver) modified; + } + } + + return null; + } + + static final class ContextHolder { + volatile Context parentContext; + volatile Context context; + } + + static final class StartOperation + implements Function, Mono> { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + StartOperation(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public Mono apply(Mono mono) { + return Mono.defer( + () -> { + Context parentContext = Context.current(); + contextHolder.parentContext = parentContext; + if (!instrumenter().shouldStart(parentContext, config)) { + return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); + } + + Context context = instrumenter().start(parentContext, config); + contextHolder.context = context; + return ContextPropagationOperator.runWithContext(mono, context) + // make contexts accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses these to set the proper context + .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) + .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); + }); + } + } + + static final class PropagateContext implements BiConsumer { + + private final ContextHolder contextHolder; + + PropagateContext(ContextHolder contextHolder) { + this.contextHolder = contextHolder; + } + + @Override + public void accept(HttpClientRequest httpClientRequest, Connection connection) { + Context context = contextHolder.context; + if (context != null) { + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE); + } + + // also propagate the context to the underlying netty instrumentation + // if this span was suppressed and context is null, propagate parentContext - this will allow + // netty spans to be suppressed too + Context nettyParentContext = context == null ? contextHolder.parentContext : context; + connection.channel().attr(AttributeKeys.WRITE_CONTEXT).set(nettyParentContext); + } + } + + static final class EndOperationWithError implements BiConsumer { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + EndOperationWithError(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public void accept(HttpClientRequest httpClientRequest, Throwable error) { + Context context = contextHolder.context; + if (context == null) { + return; + } + instrumenter().end(context, config, null, error); + } + } + + static final class EndOperation implements ConnectionObserver { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + EndOperation(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public void onStateChange(Connection connection, State newState) { + if (newState != HttpClientState.RESPONSE_COMPLETED) { + return; + } + + Context context = contextHolder.context; + if (context == null) { + return; + } + + // connection is actually an instance of HttpClientOperations - a package private class that + // implements both Connection and HttpClientResponse + if (connection instanceof HttpClientResponse) { + HttpClientResponse response = (HttpClientResponse) connection; + instrumenter().end(context, config, response, null); + } + } + + @Override + public void onUncaughtException(Connection connection, Throwable error) { + Context context = contextHolder.context; + if (context == null) { + return; + } + instrumenter().end(context, config, null, error); + } + } + + private HttpResponseReceiverInstrumenter() {} +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java deleted file mode 100644 index 10031575a3f2..000000000000 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; - -import io.opentelemetry.context.Context; -import java.util.function.Function; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; - -public class MapConnect - implements Function, Mono> { - - static final String CONTEXT_ATTRIBUTE = MapConnect.class.getName() + ".Context"; - - @Override - public Mono apply(Mono m) { - return m.contextWrite(s -> s.put(CONTEXT_ATTRIBUTE, Context.current())); - } -} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java deleted file mode 100644 index 709d4d445d44..000000000000 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; - -import io.opentelemetry.context.Context; -import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; -import java.util.function.BiConsumer; -import reactor.netty.Connection; -import reactor.netty.http.client.HttpClientRequest; - -public class OnRequest implements BiConsumer { - @Override - public void accept(HttpClientRequest r, Connection c) { - Context context = r.currentContextView().get(MapConnect.CONTEXT_ATTRIBUTE); - c.channel().attr(AttributeKeys.WRITE_CONTEXT).set(context); - } -} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java new file mode 100644 index 000000000000..d76ceb43c779 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java @@ -0,0 +1,16 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +public final class ReactorContextKeys { + + public static final String CLIENT_PARENT_CONTEXT_KEY = + ReactorContextKeys.class.getName() + ".client-parent-context"; + public static final String CLIENT_CONTEXT_KEY = + ReactorContextKeys.class.getName() + ".client-context"; + + private ReactorContextKeys() {} +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java new file mode 100644 index 000000000000..5bf9824a81e2 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java @@ -0,0 +1,114 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import javax.annotation.Nullable; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; + +final class ReactorNettyHttpClientAttributesExtractor + extends HttpClientAttributesExtractor { + + @Override + protected String url(HttpClientConfig request) { + String uri = request.uri(); + if (isAbsolute(uri)) { + return uri; + } + + // use the baseUrl if it was configured + String baseUrl = request.baseUrl(); + if (baseUrl != null) { + if (baseUrl.endsWith("/") && uri.startsWith("/")) { + baseUrl = baseUrl.substring(0, baseUrl.length() - 1); + } + return baseUrl + uri; + } + + // otherwise, use the host+port config to construct the full url + SocketAddress hostAddress = request.remoteAddress().get(); + if (hostAddress instanceof InetSocketAddress) { + InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress; + return (request.isSecure() ? "https://" : "http://") + + inetHostAddress.getHostName() + + ":" + + inetHostAddress.getPort() + + (uri.startsWith("/") ? "" : "/") + + uri; + } + + return uri; + } + + private static boolean isAbsolute(String uri) { + return uri.startsWith("http://") || uri.startsWith("https://"); + } + + @Nullable + @Override + protected String flavor(HttpClientConfig request, @Nullable HttpClientResponse response) { + if (response != null) { + String flavor = response.version().text(); + if (flavor.startsWith("HTTP/")) { + flavor = flavor.substring("HTTP/".length()); + } + return flavor; + } + return null; + } + + @Override + protected String method(HttpClientConfig request) { + return request.method().name(); + } + + @Override + protected List requestHeader(HttpClientConfig request, String name) { + return request.headers().getAll(name); + } + + @Nullable + @Override + protected Long requestContentLength( + HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Nullable + @Override + protected Long requestContentLengthUncompressed( + HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Override + protected Integer statusCode(HttpClientConfig request, HttpClientResponse response) { + return response.status().code(); + } + + @Nullable + @Override + protected Long responseContentLength(HttpClientConfig request, HttpClientResponse response) { + return null; + } + + @Nullable + @Override + protected Long responseContentLengthUncompressed( + HttpClientConfig request, HttpClientResponse response) { + return null; + } + + @Override + protected List responseHeader( + HttpClientConfig request, HttpClientResponse response, String name) { + return response.responseHeaders().getAll(name); + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index 8ede89713c1c..68f14dc4972f 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -38,6 +38,9 @@ public ElementMatcher.Junction classLoaderMatcher() { @Override public List typeInstrumentations() { - return asList(new HttpClientInstrumentation(), new TransportConnectorInstrumentation()); + return asList( + new HttpClientInstrumentation(), + new ResponseReceiverInstrumentation(), + new TransportConnectorInstrumentation()); } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java new file mode 100644 index 000000000000..49526c6f7382 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetClientAttributesExtractor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import javax.annotation.Nullable; +import reactor.netty.Connection; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; + +final class ReactorNettyNetClientAttributesExtractor + extends InetSocketAddressNetClientAttributesExtractor { + + @Nullable + @Override + public String transport(HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Nullable + @Override + public InetSocketAddress getAddress( + HttpClientConfig request, @Nullable HttpClientResponse response) { + + // we're making use of the fact that HttpClientOperations is both a Connection and an + // HttpClientResponse + if (response instanceof Connection) { + Connection connection = (Connection) response; + SocketAddress address = connection.channel().remoteAddress(); + if (address instanceof InetSocketAddress) { + return (InetSocketAddress) address; + } + } + return null; + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java index 0544449c21dd..d396c50c5e63 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java @@ -5,25 +5,58 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.PeerServiceAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyClientInstrumenterFactory; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyConnectionInstrumenter; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; public final class ReactorNettySingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-netty-1.0"; + private static final boolean alwaysCreateConnectSpan = Config.get() .getBoolean("otel.instrumentation.reactor-netty.always-create-connect-span", false); + private static final Instrumenter INSTRUMENTER; private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER; static { + ReactorNettyHttpClientAttributesExtractor httpAttributesExtractor = + new ReactorNettyHttpClientAttributesExtractor(); + ReactorNettyNetClientAttributesExtractor netAttributesExtractor = + new ReactorNettyNetClientAttributesExtractor(); + + INSTRUMENTER = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + HttpSpanNameExtractor.create(httpAttributesExtractor)) + .setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor)) + .addAttributesExtractor(httpAttributesExtractor) + .addAttributesExtractor(netAttributesExtractor) + .addAttributesExtractor(PeerServiceAttributesExtractor.create(netAttributesExtractor)) + .addRequestMetrics(HttpClientMetrics.get()) + // headers are injected in ResponseReceiverInstrumenter + .newInstrumenter(SpanKindExtractor.alwaysClient()); + NettyClientInstrumenterFactory instrumenterFactory = - new NettyClientInstrumenterFactory( - "io.opentelemetry.reactor-netty-1.0", alwaysCreateConnectSpan, false); + new NettyClientInstrumenterFactory(INSTRUMENTATION_NAME, alwaysCreateConnectSpan, false); CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter(); } + public static Instrumenter instrumenter() { + return INSTRUMENTER; + } + public static NettyConnectionInstrumenter connectionInstrumenter() { return CONNECTION_INSTRUMENTER; } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java new file mode 100644 index 000000000000..37576d9891bb --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -0,0 +1,237 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.CallDepth; +import java.util.function.BiFunction; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; + +public class ResponseReceiverInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("reactor.netty.http.client.HttpClient$ResponseReceiver"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("reactor.netty.http.client.HttpClient$ResponseReceiver")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("response").and(takesArguments(0)).and(returns(named("reactor.core.publisher.Mono"))), + this.getClass().getName() + "$ResponseMonoAdvice"); + transformer.applyAdviceToMethod( + named("response") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$ResponseFluxAdvice"); + transformer.applyAdviceToMethod( + named("responseConnection") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$ResponseConnectionAdvice"); + transformer.applyAdviceToMethod( + named("responseContent") + .and(takesArguments(0)) + .and(returns(named("reactor.netty.ByteBufFlux"))), + this.getClass().getName() + "$ResponseContentAdvice"); + transformer.applyAdviceToMethod( + named("responseSingle") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Mono"))), + this.getClass().getName() + "$ResponseSingleAdvice"); + } + + @SuppressWarnings("unused") + public static class ResponseMonoAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(ClassLoader.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Return(readOnly = false) Mono returnValue) { + + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(); + } + + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + + @SuppressWarnings("unused") + public static class ResponseFluxAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(ClassLoader.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Flux returnValue) { + + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(receiveFunction); + } + + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + + @SuppressWarnings("unused") + public static class ResponseConnectionAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(ClassLoader.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Flux returnValue) { + + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseConnection(receiveFunction); + } + + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + + @SuppressWarnings("unused") + public static class ResponseContentAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(ClassLoader.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Return(readOnly = false) ByteBufFlux returnValue) { + + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseContent(); + } + + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + + @SuppressWarnings("unused") + public static class ResponseSingleAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(ClassLoader.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Mono returnValue) { + + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseSingle(receiveFunction); + } + + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy index 24f0f91ac8eb..e958bfedbf5d 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy @@ -69,17 +69,6 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }).resolver(getAddressResolverGroup()) + return HttpClient.create() + .tcpConfiguration({ tcpClient -> + tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) + }) + .resolver(getAddressResolverGroup()) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) } @Override @@ -26,6 +30,7 @@ class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { .newConnection() .host(host) .port(port) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) return new SingleConnection() { diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy index 098c963125cb..db1a9879be4a 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy @@ -6,14 +6,18 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 import io.netty.channel.ChannelOption +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames import reactor.netty.http.client.HttpClient import reactor.netty.tcp.TcpClient class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { HttpClient createHttpClient() { - return HttpClient.from(TcpClient.create()).tcpConfiguration({ tcpClient -> - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }).resolver(getAddressResolverGroup()) + return HttpClient.from(TcpClient.create()) + .tcpConfiguration({ tcpClient -> + tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) + }) + .resolver(getAddressResolverGroup()) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) } } From 1ac013790a46d7cac5abf82d5ddb847ed10c7167 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 18 Nov 2021 11:22:59 +0100 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Trask Stalnaker --- .../reactornetty/v1_0/HttpResponseReceiverInstrumenter.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java index 014d98c3e746..98e264b04ed2 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -29,7 +29,7 @@ public final class HttpResponseReceiverInstrumenter { // this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP // request processing - // it should be used just before one of the response*() methods is called - at this point the HTTP + // it should be used just before one of the response*() methods is called - after this point the HTTP // request is no longer modifiable by the user @Nullable public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseReceiver receiver) { @@ -80,6 +80,8 @@ public Mono apply(Mono mono) { Context parentContext = Context.current(); contextHolder.parentContext = parentContext; if (!instrumenter().shouldStart(parentContext, config)) { + // make context accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses this to set the proper context for callbacks return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); } @@ -87,7 +89,7 @@ public Mono apply(Mono mono) { contextHolder.context = context; return ContextPropagationOperator.runWithContext(mono, context) // make contexts accessible via the reactor ContextView - the doOn* callbacks - // instrumentation uses these to set the proper context + // instrumentation uses the parent context to set the proper context for callbacks .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); }); From 856480639a41410f60b928a44cc40c6d3cd1df07 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 18 Nov 2021 11:58:36 +0100 Subject: [PATCH 3/5] code review comments --- .../v1_0/HttpClientInstrumentation.java | 65 ++++++++++++++----- .../HttpResponseReceiverInstrumenter.java | 3 +- .../v1_0/ReactorNettyClientSslTest.groovy | 28 ++++++-- .../ReactorNettyConnectionSpanTest.groovy | 30 +++++++-- 4 files changed, 97 insertions(+), 29 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java index 69c6c62b8fc4..d3f9fd5ff517 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java @@ -33,10 +33,16 @@ public void transform(TypeTransformer transformer) { // advice classes below expose current context in doOn*/doAfter* callbacks transformer.applyAdviceToMethod( isPublic() - .and(namedOneOf("doOnRequest", "doAfterRequest")) + .and(named("doOnRequest")) .and(takesArguments(1)) .and(takesArgument(0, BiConsumer.class)), this.getClass().getName() + "$OnRequestAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("doAfterRequest")) + .and(takesArguments(1)) + .and(takesArgument(0, BiConsumer.class)), + this.getClass().getName() + "$AfterRequestAdvice"); transformer.applyAdviceToMethod( isPublic() .and(named("doOnRequestError")) @@ -45,10 +51,16 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$OnRequestErrorAdvice"); transformer.applyAdviceToMethod( isPublic() - .and(namedOneOf("doOnResponse", "doAfterResponseSuccess", "doOnRedirect")) + .and(named("doOnResponse")) .and(takesArguments(1)) .and(takesArgument(0, BiConsumer.class)), this.getClass().getName() + "$OnResponseAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(namedOneOf("doAfterResponseSuccess", "doOnRedirect")) + .and(takesArguments(1)) + .and(takesArgument(0, BiConsumer.class)), + this.getClass().getName() + "$AfterResponseAdvice"); transformer.applyAdviceToMethod( isPublic() .and(named("doOnResponseError")) @@ -70,16 +82,25 @@ public static class OnRequestAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback, - @Advice.Origin("#m") String methodName) { + BiConsumer callback) { + + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); + } + } + } + + @SuppressWarnings("unused") + public static class AfterRequestAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) + BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - // use client context after request is sent, parent context before that - PropagatedContext propagatedContext = - "doAfterRequest".equals(methodName) - ? PropagatedContext.CLIENT - : PropagatedContext.PARENT; - callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext); + // use client context after request is sent + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } } } @@ -105,15 +126,25 @@ public static class OnResponseAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) - BiConsumer callback, - @Advice.Origin("#m") String methodName) { + BiConsumer callback) { + + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { + // use client context just when response status & headers are received + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); + } + } + } + + @SuppressWarnings("unused") + public static class AfterResponseAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) + BiConsumer callback) { if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - // use client context just when response status & headers are received, the parent context - // after the response is completed - PropagatedContext propagatedContext = - "doOnResponse".equals(methodName) ? PropagatedContext.CLIENT : PropagatedContext.PARENT; - callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext); + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); } } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java index 98e264b04ed2..4362a24e05da 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -29,7 +29,8 @@ public final class HttpResponseReceiverInstrumenter { // this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP // request processing - // it should be used just before one of the response*() methods is called - after this point the HTTP + // it should be used just before one of the response*() methods is called - after this point the + // HTTP // request is no longer modifiable by the user @Nullable public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseReceiver receiver) { diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy index f5b1d9c4b6d2..b1d877dd34ef 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyClientSslTest.groovy @@ -19,6 +19,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.SERVER import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { @@ -38,9 +39,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { def "should fail SSL handshake"() { given: def httpClient = createHttpClient(["SSLv3"]) + def uri = "https://localhost:${server.httpsPort()}/success" when: - def responseMono = httpClient.get().uri("https://localhost:${server.httpsPort()}/success") + def responseMono = httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -67,6 +69,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { status ERROR // netty swallows the exception, it doesn't make any sense to hard-code the message errorEventWithAnyMessage(SSLHandshakeException) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + } } span(2) { name "RESOLVE" @@ -86,7 +92,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } span(4) { @@ -100,7 +106,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } } @@ -110,9 +116,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { def "should successfully establish SSL handshake"() { given: def httpClient = createHttpClient() + def uri = "https://localhost:${server.httpsPort()}/success" when: - def responseMono = httpClient.get().uri("https://localhost:${server.httpsPort()}/success") + def responseMono = httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -132,6 +139,15 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { name "HTTP GET" kind CLIENT childOf span(0) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + "${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1 + "${SemanticAttributes.HTTP_STATUS_CODE}" 200 + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } } span(2) { name "RESOLVE" @@ -151,7 +167,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } span(4) { @@ -162,7 +178,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } span(5) { diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy index 32582e5b6009..19a0c6fb608e 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy @@ -17,6 +17,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.SERVER import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { @@ -34,11 +35,14 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem } def "test successful request"() { - when: + given: def httpClient = HttpClient.create() + def uri = "http://localhost:${server.httpPort()}/success" + + when: def responseCode = runWithSpan("parent") { - httpClient.get().uri("http://localhost:${server.httpPort()}/success") + httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -60,6 +64,15 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem name "HTTP GET" kind CLIENT childOf span(0) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + "${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1 + "${SemanticAttributes.HTTP_STATUS_CODE}" 200 + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } } span(2) { name "RESOLVE" @@ -92,10 +105,13 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem } def "test failing request"() { - when: + given: def httpClient = HttpClient.create() + def uri = "http://localhost:${PortUtils.UNUSABLE_PORT}" + + when: runWithSpan("parent") { - httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}") + httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -124,6 +140,10 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem childOf span(0) status ERROR errorEvent(connectException.class, connectException.message) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + } } span(2) { name "RESOLVE" @@ -145,7 +165,7 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } } From 19d581aba6afa5759999c0903aa27de1c1b39774 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 18 Nov 2021 12:01:42 +0100 Subject: [PATCH 4/5] code review comments --- .../v1_0/ResponseReceiverInstrumentation.java | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java index 37576d9891bb..06c05a62323f 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -91,12 +91,14 @@ public static void onExit( @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, @Advice.Return(readOnly = false) Mono returnValue) { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(); + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); } - - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); } } @@ -125,12 +127,14 @@ public static void onExit( @Advice.Argument(0) BiFunction receiveFunction, @Advice.Return(readOnly = false) Flux returnValue) { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.response(receiveFunction); + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); } - - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); } } @@ -159,12 +163,14 @@ public static void onExit( @Advice.Argument(0) BiFunction receiveFunction, @Advice.Return(readOnly = false) Flux returnValue) { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseConnection(receiveFunction); + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseConnection(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); } - - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); } } @@ -192,12 +198,14 @@ public static void onExit( @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, @Advice.Return(readOnly = false) ByteBufFlux returnValue) { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseContent(); + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseContent(); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); } - - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); } } @@ -226,12 +234,14 @@ public static void onExit( @Advice.Argument(0) BiFunction receiveFunction, @Advice.Return(readOnly = false) Mono returnValue) { - if (modifiedReceiver != null) { - returnValue = modifiedReceiver.responseSingle(receiveFunction); + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseSingle(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); } - - // needs to be called after original method to prevent StackOverflowError - callDepth.decrementAndGet(); } } } From 47edd165479d3c8357d5d1bd3a6fbbcd40069d74 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Thu, 18 Nov 2021 12:03:56 +0100 Subject: [PATCH 5/5] code review comments --- .../v1_0/ResponseReceiverInstrumentation.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java index 06c05a62323f..ca13b6520c30 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -75,7 +75,7 @@ public static HttpClient.ResponseReceiver onEnter( @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.This HttpClient.ResponseReceiver receiver) { - callDepth = CallDepth.forClass(ClassLoader.class); + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); if (callDepth.getAndIncrement() > 0) { // execute the original method on nested calls return null; @@ -110,7 +110,7 @@ public static HttpClient.ResponseReceiver onEnter( @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.This HttpClient.ResponseReceiver receiver) { - callDepth = CallDepth.forClass(ClassLoader.class); + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); if (callDepth.getAndIncrement() > 0) { // execute the original method on nested calls return null; @@ -146,7 +146,7 @@ public static HttpClient.ResponseReceiver onEnter( @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.This HttpClient.ResponseReceiver receiver) { - callDepth = CallDepth.forClass(ClassLoader.class); + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); if (callDepth.getAndIncrement() > 0) { // execute the original method on nested calls return null; @@ -182,7 +182,7 @@ public static HttpClient.ResponseReceiver onEnter( @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.This HttpClient.ResponseReceiver receiver) { - callDepth = CallDepth.forClass(ClassLoader.class); + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); if (callDepth.getAndIncrement() > 0) { // execute the original method on nested calls return null; @@ -217,7 +217,7 @@ public static HttpClient.ResponseReceiver onEnter( @Advice.Local("otelCallDepth") CallDepth callDepth, @Advice.This HttpClient.ResponseReceiver receiver) { - callDepth = CallDepth.forClass(ClassLoader.class); + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); if (callDepth.getAndIncrement() > 0) { // execute the original method on nested calls return null;