diff --git a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java index 4e3e03a0a3a4..bfd6d63566e3 100644 --- a/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java +++ b/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java @@ -117,7 +117,7 @@ public void resetOnEachOperator() { } /** Forces Mono to run in traceContext scope. */ - static Mono runWithContext(Mono publisher, Context tracingContext) { + public static Mono runWithContext(Mono publisher, Context tracingContext) { if (!enabled) { return publisher; } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts index 122b0b58c709..2a12f952cca8 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/build.gradle.kts @@ -21,6 +21,8 @@ dependencies { annotationProcessor("com.google.auto.value:auto-value") implementation(project(":instrumentation:netty:netty-4.1-common:javaagent")) + implementation(project(":instrumentation:reactor-3.1:library")) + library("io.projectreactor.netty:reactor-netty-http:1.0.0") testInstrumentation(project(":instrumentation:reactor-netty:reactor-netty-0.9:javaagent")) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java index d53a719f70da..ab6fbf554a3c 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/DecoratorFunctions.java @@ -5,10 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; -import io.netty.channel.Channel; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; import java.util.function.BiConsumer; import javax.annotation.Nullable; import reactor.netty.Connection; @@ -24,15 +22,19 @@ public static boolean shouldDecorate(Class callbackClass) { public static final class OnMessageDecorator implements BiConsumer { + private final BiConsumer delegate; + private final PropagatedContext propagatedContext; - public OnMessageDecorator(BiConsumer delegate) { + public OnMessageDecorator( + BiConsumer delegate, PropagatedContext propagatedContext) { this.delegate = delegate; + this.propagatedContext = propagatedContext; } @Override public void accept(M message, Connection connection) { - Context context = getChannelContext(message.currentContextView(), connection.channel()); + Context context = getChannelContext(message.currentContextView(), propagatedContext); if (context == null) { delegate.accept(message, connection); } else { @@ -45,15 +47,19 @@ public void accept(M message, Connection connection) { public static final class OnMessageErrorDecorator implements BiConsumer { + private final BiConsumer delegate; + private final PropagatedContext propagatedContext; - public OnMessageErrorDecorator(BiConsumer delegate) { + public OnMessageErrorDecorator( + BiConsumer delegate, PropagatedContext propagatedContext) { this.delegate = delegate; + this.propagatedContext = propagatedContext; } @Override public void accept(M message, Throwable throwable) { - Context context = getChannelContext(message.currentContextView(), null); + Context context = getChannelContext(message.currentContextView(), propagatedContext); if (context == null) { delegate.accept(message, throwable); } else { @@ -65,16 +71,27 @@ public void accept(M message, Throwable throwable) { } @Nullable - private static Context getChannelContext(ContextView contextView, @Nullable Channel channel) { - // try to get the client span context from the channel if it's available - if (channel != null) { - Context context = channel.attr(AttributeKeys.CLIENT_CONTEXT).get(); - if (context != null) { - return context; - } + private static Context getChannelContext( + ContextView contextView, PropagatedContext propagatedContext) { + Context context = null; + if (propagatedContext.useClientContext) { + context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null); + } + if (context == null) { + context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null); + } + return context; + } + + public enum PropagatedContext { + PARENT(false), + CLIENT(true); + + final boolean useClientContext; + + PropagatedContext(boolean useClientContext) { + this.useClientContext = useClientContext; } - // otherwise use the parent span context - return contextView.getOrDefault(MapConnect.CONTEXT_ATTRIBUTE, null); } private DecoratorFunctions() {} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java index 68a8429bb5f2..d3f9fd5ff517 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientInstrumentation.java @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.isStatic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; @@ -14,13 +13,12 @@ import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.api.CallDepth; +import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext; import java.util.function.BiConsumer; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import reactor.netty.Connection; -import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; @@ -32,17 +30,19 @@ public ElementMatcher typeMatcher() { @Override public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isStatic().and(namedOneOf("create", "newConnection", "from")), - this.getClass().getName() + "$CreateAdvice"); - // advice classes below expose current context in doOn*/doAfter* callbacks transformer.applyAdviceToMethod( isPublic() - .and(namedOneOf("doOnRequest", "doAfterRequest")) + .and(named("doOnRequest")) .and(takesArguments(1)) .and(takesArgument(0, BiConsumer.class)), this.getClass().getName() + "$OnRequestAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(named("doAfterRequest")) + .and(takesArguments(1)) + .and(takesArgument(0, BiConsumer.class)), + this.getClass().getName() + "$AfterRequestAdvice"); transformer.applyAdviceToMethod( isPublic() .and(named("doOnRequestError")) @@ -51,10 +51,16 @@ public void transform(TypeTransformer transformer) { this.getClass().getName() + "$OnRequestErrorAdvice"); transformer.applyAdviceToMethod( isPublic() - .and(namedOneOf("doOnResponse", "doAfterResponseSuccess", "doOnRedirect")) + .and(named("doOnResponse")) .and(takesArguments(1)) .and(takesArgument(0, BiConsumer.class)), this.getClass().getName() + "$OnResponseAdvice"); + transformer.applyAdviceToMethod( + isPublic() + .and(namedOneOf("doAfterResponseSuccess", "doOnRedirect")) + .and(takesArguments(1)) + .and(takesArgument(0, BiConsumer.class)), + this.getClass().getName() + "$AfterResponseAdvice"); transformer.applyAdviceToMethod( isPublic() .and(named("doOnResponseError")) @@ -71,35 +77,30 @@ public void transform(TypeTransformer transformer) { } @SuppressWarnings("unused") - public static class CreateAdvice { + public static class OnRequestAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) { - callDepth = CallDepth.forClass(HttpClient.class); - callDepth.getAndIncrement(); - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Thrown Throwable throwable, - @Advice.Return(readOnly = false) HttpClient client, - @Advice.Local("otelCallDepth") CallDepth callDepth) { + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) + BiConsumer callback) { - if (callDepth.decrementAndGet() == 0 && throwable == null) { - client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect()); + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); } } } @SuppressWarnings("unused") - public static class OnRequestAdvice { + public static class AfterRequestAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageDecorator<>(callback); + // use client context after request is sent + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); } } } @@ -111,8 +112,10 @@ public static class OnRequestErrorAdvice { public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback); + callback = + new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } } } @@ -124,8 +127,24 @@ public static class OnResponseAdvice { public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageDecorator<>(callback); + // use client context just when response status & headers are received + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.CLIENT); + } + } + } + + @SuppressWarnings("unused") + public static class AfterResponseAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) + BiConsumer callback) { + + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { + callback = new DecoratorFunctions.OnMessageDecorator<>(callback, PropagatedContext.PARENT); } } } @@ -137,8 +156,10 @@ public static class OnResponseErrorAdvice { public static void onEnter( @Advice.Argument(value = 0, readOnly = false) BiConsumer callback) { + if (DecoratorFunctions.shouldDecorate(callback.getClass())) { - callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback); + callback = + new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT); } } } @@ -152,11 +173,16 @@ public static void onEnter( BiConsumer requestCallback, @Advice.Argument(value = 1, readOnly = false) BiConsumer responseCallback) { + if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) { - requestCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(requestCallback); + requestCallback = + new DecoratorFunctions.OnMessageErrorDecorator<>( + requestCallback, PropagatedContext.PARENT); } if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) { - responseCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(responseCallback); + responseCallback = + new DecoratorFunctions.OnMessageErrorDecorator<>( + responseCallback, PropagatedContext.PARENT); } } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java new file mode 100644 index 000000000000..b7fecafa80d8 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpClientRequestHeadersSetter.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.context.propagation.TextMapSetter; +import javax.annotation.Nullable; +import reactor.netty.http.client.HttpClientRequest; + +enum HttpClientRequestHeadersSetter implements TextMapSetter { + INSTANCE; + + @Override + public void set(@Nullable HttpClientRequest request, String key, String value) { + request.header(key, value); + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java new file mode 100644 index 000000000000..4362a24e05da --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -0,0 +1,185 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_CONTEXT_KEY; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY; +import static io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.ReactorNettySingletons.instrumenter; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator; +import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; +import java.util.function.BiConsumer; +import java.util.function.Function; +import javax.annotation.Nullable; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.ConnectionObserver; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientRequest; +import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.http.client.HttpClientState; + +public final class HttpResponseReceiverInstrumenter { + + // this method adds several stateful listeners that execute the instrumenter lifecycle during HTTP + // request processing + // it should be used just before one of the response*() methods is called - after this point the + // HTTP + // request is no longer modifiable by the user + @Nullable + public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseReceiver receiver) { + // receiver should always be an HttpClientFinalizer, which both extends HttpClient and + // implements ResponseReceiver + if (receiver instanceof HttpClient) { + HttpClient client = (HttpClient) receiver; + HttpClientConfig config = client.configuration(); + + ContextHolder contextHolder = new ContextHolder(); + + HttpClient modified = + client + .mapConnect(new StartOperation(contextHolder, config)) + .doOnRequest(new PropagateContext(contextHolder)) + .doOnRequestError(new EndOperationWithError(contextHolder, config)) + .observe(new EndOperation(contextHolder, config)); + + // modified should always be an HttpClientFinalizer too + if (modified instanceof HttpClient.ResponseReceiver) { + return (HttpClient.ResponseReceiver) modified; + } + } + + return null; + } + + static final class ContextHolder { + volatile Context parentContext; + volatile Context context; + } + + static final class StartOperation + implements Function, Mono> { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + StartOperation(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public Mono apply(Mono mono) { + return Mono.defer( + () -> { + Context parentContext = Context.current(); + contextHolder.parentContext = parentContext; + if (!instrumenter().shouldStart(parentContext, config)) { + // make context accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses this to set the proper context for callbacks + return mono.contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)); + } + + Context context = instrumenter().start(parentContext, config); + contextHolder.context = context; + return ContextPropagationOperator.runWithContext(mono, context) + // make contexts accessible via the reactor ContextView - the doOn* callbacks + // instrumentation uses the parent context to set the proper context for callbacks + .contextWrite(ctx -> ctx.put(CLIENT_PARENT_CONTEXT_KEY, parentContext)) + .contextWrite(ctx -> ctx.put(CLIENT_CONTEXT_KEY, context)); + }); + } + } + + static final class PropagateContext implements BiConsumer { + + private final ContextHolder contextHolder; + + PropagateContext(ContextHolder contextHolder) { + this.contextHolder = contextHolder; + } + + @Override + public void accept(HttpClientRequest httpClientRequest, Connection connection) { + Context context = contextHolder.context; + if (context != null) { + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(context, httpClientRequest, HttpClientRequestHeadersSetter.INSTANCE); + } + + // also propagate the context to the underlying netty instrumentation + // if this span was suppressed and context is null, propagate parentContext - this will allow + // netty spans to be suppressed too + Context nettyParentContext = context == null ? contextHolder.parentContext : context; + connection.channel().attr(AttributeKeys.WRITE_CONTEXT).set(nettyParentContext); + } + } + + static final class EndOperationWithError implements BiConsumer { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + EndOperationWithError(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public void accept(HttpClientRequest httpClientRequest, Throwable error) { + Context context = contextHolder.context; + if (context == null) { + return; + } + instrumenter().end(context, config, null, error); + } + } + + static final class EndOperation implements ConnectionObserver { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + EndOperation(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; + } + + @Override + public void onStateChange(Connection connection, State newState) { + if (newState != HttpClientState.RESPONSE_COMPLETED) { + return; + } + + Context context = contextHolder.context; + if (context == null) { + return; + } + + // connection is actually an instance of HttpClientOperations - a package private class that + // implements both Connection and HttpClientResponse + if (connection instanceof HttpClientResponse) { + HttpClientResponse response = (HttpClientResponse) connection; + instrumenter().end(context, config, response, null); + } + } + + @Override + public void onUncaughtException(Connection connection, Throwable error) { + Context context = contextHolder.context; + if (context == null) { + return; + } + instrumenter().end(context, config, null, error); + } + } + + private HttpResponseReceiverInstrumenter() {} +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java deleted file mode 100644 index 10031575a3f2..000000000000 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/MapConnect.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; - -import io.opentelemetry.context.Context; -import java.util.function.Function; -import reactor.core.publisher.Mono; -import reactor.netty.Connection; - -public class MapConnect - implements Function, Mono> { - - static final String CONTEXT_ATTRIBUTE = MapConnect.class.getName() + ".Context"; - - @Override - public Mono apply(Mono m) { - return m.contextWrite(s -> s.put(CONTEXT_ATTRIBUTE, Context.current())); - } -} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java deleted file mode 100644 index 709d4d445d44..000000000000 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/OnRequest.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; - -import io.opentelemetry.context.Context; -import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys; -import java.util.function.BiConsumer; -import reactor.netty.Connection; -import reactor.netty.http.client.HttpClientRequest; - -public class OnRequest implements BiConsumer { - @Override - public void accept(HttpClientRequest r, Connection c) { - Context context = r.currentContextView().get(MapConnect.CONTEXT_ATTRIBUTE); - c.channel().attr(AttributeKeys.WRITE_CONTEXT).set(context); - } -} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java new file mode 100644 index 000000000000..d76ceb43c779 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorContextKeys.java @@ -0,0 +1,16 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +public final class ReactorContextKeys { + + public static final String CLIENT_PARENT_CONTEXT_KEY = + ReactorContextKeys.class.getName() + ".client-parent-context"; + public static final String CLIENT_CONTEXT_KEY = + ReactorContextKeys.class.getName() + ".client-context"; + + private ReactorContextKeys() {} +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java new file mode 100644 index 000000000000..5bf9824a81e2 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientAttributesExtractor.java @@ -0,0 +1,114 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.List; +import javax.annotation.Nullable; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; + +final class ReactorNettyHttpClientAttributesExtractor + extends HttpClientAttributesExtractor { + + @Override + protected String url(HttpClientConfig request) { + String uri = request.uri(); + if (isAbsolute(uri)) { + return uri; + } + + // use the baseUrl if it was configured + String baseUrl = request.baseUrl(); + if (baseUrl != null) { + if (baseUrl.endsWith("/") && uri.startsWith("/")) { + baseUrl = baseUrl.substring(0, baseUrl.length() - 1); + } + return baseUrl + uri; + } + + // otherwise, use the host+port config to construct the full url + SocketAddress hostAddress = request.remoteAddress().get(); + if (hostAddress instanceof InetSocketAddress) { + InetSocketAddress inetHostAddress = (InetSocketAddress) hostAddress; + return (request.isSecure() ? "https://" : "http://") + + inetHostAddress.getHostName() + + ":" + + inetHostAddress.getPort() + + (uri.startsWith("/") ? "" : "/") + + uri; + } + + return uri; + } + + private static boolean isAbsolute(String uri) { + return uri.startsWith("http://") || uri.startsWith("https://"); + } + + @Nullable + @Override + protected String flavor(HttpClientConfig request, @Nullable HttpClientResponse response) { + if (response != null) { + String flavor = response.version().text(); + if (flavor.startsWith("HTTP/")) { + flavor = flavor.substring("HTTP/".length()); + } + return flavor; + } + return null; + } + + @Override + protected String method(HttpClientConfig request) { + return request.method().name(); + } + + @Override + protected List requestHeader(HttpClientConfig request, String name) { + return request.headers().getAll(name); + } + + @Nullable + @Override + protected Long requestContentLength( + HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Nullable + @Override + protected Long requestContentLengthUncompressed( + HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Override + protected Integer statusCode(HttpClientConfig request, HttpClientResponse response) { + return response.status().code(); + } + + @Nullable + @Override + protected Long responseContentLength(HttpClientConfig request, HttpClientResponse response) { + return null; + } + + @Nullable + @Override + protected Long responseContentLengthUncompressed( + HttpClientConfig request, HttpClientResponse response) { + return null; + } + + @Override + protected List responseHeader( + HttpClientConfig request, HttpClientResponse response, String name) { + return response.responseHeaders().getAll(name); + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java index 8ede89713c1c..68f14dc4972f 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyInstrumentationModule.java @@ -38,6 +38,9 @@ public ElementMatcher.Junction classLoaderMatcher() { @Override public List typeInstrumentations() { - return asList(new HttpClientInstrumentation(), new TransportConnectorInstrumentation()); + return asList( + new HttpClientInstrumentation(), + new ResponseReceiverInstrumentation(), + new TransportConnectorInstrumentation()); } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java new file mode 100644 index 000000000000..49526c6f7382 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyNetClientAttributesExtractor.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import io.opentelemetry.instrumentation.api.instrumenter.net.InetSocketAddressNetClientAttributesExtractor; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import javax.annotation.Nullable; +import reactor.netty.Connection; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; + +final class ReactorNettyNetClientAttributesExtractor + extends InetSocketAddressNetClientAttributesExtractor { + + @Nullable + @Override + public String transport(HttpClientConfig request, @Nullable HttpClientResponse response) { + return null; + } + + @Nullable + @Override + public InetSocketAddress getAddress( + HttpClientConfig request, @Nullable HttpClientResponse response) { + + // we're making use of the fact that HttpClientOperations is both a Connection and an + // HttpClientResponse + if (response instanceof Connection) { + Connection connection = (Connection) response; + SocketAddress address = connection.channel().remoteAddress(); + if (address instanceof InetSocketAddress) { + return (InetSocketAddress) address; + } + } + return null; + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java index 0544449c21dd..d396c50c5e63 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettySingletons.java @@ -5,25 +5,58 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; +import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.config.Config; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.PeerServiceAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientMetrics; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.http.HttpSpanStatusExtractor; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyClientInstrumenterFactory; import io.opentelemetry.javaagent.instrumentation.netty.common.client.NettyConnectionInstrumenter; +import reactor.netty.http.client.HttpClientConfig; +import reactor.netty.http.client.HttpClientResponse; public final class ReactorNettySingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.reactor-netty-1.0"; + private static final boolean alwaysCreateConnectSpan = Config.get() .getBoolean("otel.instrumentation.reactor-netty.always-create-connect-span", false); + private static final Instrumenter INSTRUMENTER; private static final NettyConnectionInstrumenter CONNECTION_INSTRUMENTER; static { + ReactorNettyHttpClientAttributesExtractor httpAttributesExtractor = + new ReactorNettyHttpClientAttributesExtractor(); + ReactorNettyNetClientAttributesExtractor netAttributesExtractor = + new ReactorNettyNetClientAttributesExtractor(); + + INSTRUMENTER = + Instrumenter.builder( + GlobalOpenTelemetry.get(), + INSTRUMENTATION_NAME, + HttpSpanNameExtractor.create(httpAttributesExtractor)) + .setSpanStatusExtractor(HttpSpanStatusExtractor.create(httpAttributesExtractor)) + .addAttributesExtractor(httpAttributesExtractor) + .addAttributesExtractor(netAttributesExtractor) + .addAttributesExtractor(PeerServiceAttributesExtractor.create(netAttributesExtractor)) + .addRequestMetrics(HttpClientMetrics.get()) + // headers are injected in ResponseReceiverInstrumenter + .newInstrumenter(SpanKindExtractor.alwaysClient()); + NettyClientInstrumenterFactory instrumenterFactory = - new NettyClientInstrumenterFactory( - "io.opentelemetry.reactor-netty-1.0", alwaysCreateConnectSpan, false); + new NettyClientInstrumenterFactory(INSTRUMENTATION_NAME, alwaysCreateConnectSpan, false); CONNECTION_INSTRUMENTER = instrumenterFactory.createConnectionInstrumenter(); } + public static Instrumenter instrumenter() { + return INSTRUMENTER; + } + public static NettyConnectionInstrumenter connectionInstrumenter() { return CONNECTION_INSTRUMENTER; } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java new file mode 100644 index 000000000000..ca13b6520c30 --- /dev/null +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ResponseReceiverInstrumentation.java @@ -0,0 +1,247 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import io.opentelemetry.javaagent.instrumentation.api.CallDepth; +import java.util.function.BiFunction; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClientResponse; + +public class ResponseReceiverInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("reactor.netty.http.client.HttpClient$ResponseReceiver"); + } + + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("reactor.netty.http.client.HttpClient$ResponseReceiver")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("response").and(takesArguments(0)).and(returns(named("reactor.core.publisher.Mono"))), + this.getClass().getName() + "$ResponseMonoAdvice"); + transformer.applyAdviceToMethod( + named("response") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$ResponseFluxAdvice"); + transformer.applyAdviceToMethod( + named("responseConnection") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Flux"))), + this.getClass().getName() + "$ResponseConnectionAdvice"); + transformer.applyAdviceToMethod( + named("responseContent") + .and(takesArguments(0)) + .and(returns(named("reactor.netty.ByteBufFlux"))), + this.getClass().getName() + "$ResponseContentAdvice"); + transformer.applyAdviceToMethod( + named("responseSingle") + .and(takesArguments(1)) + .and(takesArgument(0, BiFunction.class)) + .and(returns(named("reactor.core.publisher.Mono"))), + this.getClass().getName() + "$ResponseSingleAdvice"); + } + + @SuppressWarnings("unused") + public static class ResponseMonoAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Return(readOnly = false) Mono returnValue) { + + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + } + + @SuppressWarnings("unused") + public static class ResponseFluxAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Flux returnValue) { + + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.response(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + } + + @SuppressWarnings("unused") + public static class ResponseConnectionAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Flux returnValue) { + + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseConnection(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + } + + @SuppressWarnings("unused") + public static class ResponseContentAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Return(readOnly = false) ByteBufFlux returnValue) { + + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseContent(); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + } + + @SuppressWarnings("unused") + public static class ResponseSingleAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) + public static HttpClient.ResponseReceiver onEnter( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.This HttpClient.ResponseReceiver receiver) { + + callDepth = CallDepth.forClass(HttpClient.ResponseReceiver.class); + if (callDepth.getAndIncrement() > 0) { + // execute the original method on nested calls + return null; + } + + // non-null value will skip the original method invocation + return HttpResponseReceiverInstrumenter.instrument(receiver); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.Local("otelCallDepth") CallDepth callDepth, + @Advice.Enter HttpClient.ResponseReceiver modifiedReceiver, + @Advice.Argument(0) BiFunction receiveFunction, + @Advice.Return(readOnly = false) Mono returnValue) { + + try { + if (modifiedReceiver != null) { + returnValue = modifiedReceiver.responseSingle(receiveFunction); + } + } finally { + // needs to be called after original method to prevent StackOverflowError + callDepth.decrementAndGet(); + } + } + } +} diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy index 24f0f91ac8eb..e958bfedbf5d 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy @@ -69,17 +69,6 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest // Make sure to consume content since that's when we close the span. content.map { resp } @@ -54,37 +56,49 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { Throwable thrownException = thrown() assertTraces(1) { - trace(0, 4) { + trace(0, 5) { span(0) { name "parent" status ERROR errorEvent(thrownException.class, thrownException.message) } span(1) { + name "HTTP GET" + kind CLIENT + childOf span(0) + status ERROR + // netty swallows the exception, it doesn't make any sense to hard-code the message + errorEventWithAnyMessage(SSLHandshakeException) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + } + } + span(2) { name "RESOLVE" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() } } - span(2) { + span(3) { name "CONNECT" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } - span(3) { + span(4) { name "SSL handshake" kind INTERNAL - childOf span(0) + childOf span(1) status ERROR // netty swallows the exception, it doesn't make any sense to hard-code the message errorEventWithAnyMessage(SSLHandshakeException) @@ -92,7 +106,7 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } } @@ -102,9 +116,10 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { def "should successfully establish SSL handshake"() { given: def httpClient = createHttpClient() + def uri = "https://localhost:${server.httpsPort()}/success" when: - def responseMono = httpClient.get().uri("https://localhost:${server.httpsPort()}/success") + def responseMono = httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -121,46 +136,55 @@ class ReactorNettyClientSslTest extends AgentInstrumentationSpecification { name "parent" } span(1) { - name "RESOLVE" - kind INTERNAL + name "HTTP GET" + kind CLIENT childOf span(0) attributes { - "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + "${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1 + "${SemanticAttributes.HTTP_STATUS_CODE}" 200 "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } span(2) { - name "CONNECT" + name "RESOLVE" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } } } span(3) { - name "SSL handshake" + name "CONNECT" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } span(4) { - name "HTTP GET" - kind CLIENT - childOf(span(0)) + name "SSL handshake" + kind INTERNAL + childOf span(1) + attributes { + "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpsPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } } span(5) { name "test-http-server" kind SERVER - childOf(span(4)) + childOf span(1) } } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy index 4484351b6105..19a0c6fb608e 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyConnectionSpanTest.groovy @@ -17,6 +17,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.SERVER import static io.opentelemetry.api.trace.StatusCode.ERROR +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HttpFlavorValues.HTTP_1_1 import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NetTransportValues.IP_TCP class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implements AgentTestTrait { @@ -34,11 +35,14 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem } def "test successful request"() { - when: + given: def httpClient = HttpClient.create() + def uri = "http://localhost:${server.httpPort()}/success" + + when: def responseCode = runWithSpan("parent") { - httpClient.get().uri("http://localhost:${server.httpPort()}/success") + httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -57,19 +61,33 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem hasNoParent() } span(1) { + name "HTTP GET" + kind CLIENT + childOf span(0) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + "${SemanticAttributes.HTTP_FLAVOR}" HTTP_1_1 + "${SemanticAttributes.HTTP_STATUS_CODE}" 200 + "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" + "${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort() + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" + } + } + span(2) { name "RESOLVE" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" server.httpPort() } } - span(2) { + span(3) { name "CONNECT" kind INTERNAL - childOf(span(0)) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" @@ -77,25 +95,23 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } - span(3) { - name "HTTP GET" - kind CLIENT - childOf(span(0)) - } span(4) { name "test-http-server" kind SERVER - childOf(span(3)) + childOf span(1) } } } } def "test failing request"() { - when: + given: def httpClient = HttpClient.create() + def uri = "http://localhost:${PortUtils.UNUSABLE_PORT}" + + when: runWithSpan("parent") { - httpClient.get().uri("http://localhost:${PortUtils.UNUSABLE_PORT}") + httpClient.get().uri(uri) .responseSingle { resp, content -> // Make sure to consume content since that's when we close the span. content.map { resp } @@ -110,7 +126,7 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem and: assertTraces(1) { - trace(0, 3) { + trace(0, 4) { span(0) { name "parent" kind INTERNAL @@ -119,26 +135,37 @@ class ReactorNettyConnectionSpanTest extends InstrumentationSpecification implem errorEvent(thrownException.class, thrownException.message) } span(1) { + name "HTTP GET" + kind CLIENT + childOf span(0) + status ERROR + errorEvent(connectException.class, connectException.message) + attributes { + "${SemanticAttributes.HTTP_METHOD}" "GET" + "${SemanticAttributes.HTTP_URL}" uri + } + } + span(2) { name "RESOLVE" kind INTERNAL - childOf span(0) + childOf span(1) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT } } - span(2) { + span(3) { name "CONNECT" kind INTERNAL - childOf(span(0)) + childOf span(1) status ERROR errorEvent(connectException.class, connectException.message) attributes { "${SemanticAttributes.NET_TRANSPORT.key}" IP_TCP "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_PORT.key}" PortUtils.UNUSABLE_PORT - "${SemanticAttributes.NET_PEER_IP.key}" { it == null || it == "127.0.0.1" } + "${SemanticAttributes.NET_PEER_IP.key}" "127.0.0.1" } } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy index c06c7bcd61ee..c844c2878336 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientTest.groovy @@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 import io.netty.channel.ChannelOption import io.opentelemetry.instrumentation.testing.junit.http.SingleConnection +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames import reactor.netty.http.client.HttpClient import java.util.concurrent.ExecutionException @@ -15,9 +16,12 @@ import java.util.concurrent.TimeoutException class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { HttpClient createHttpClient() { - return HttpClient.create().tcpConfiguration({ tcpClient -> - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }).resolver(getAddressResolverGroup()) + return HttpClient.create() + .tcpConfiguration({ tcpClient -> + tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) + }) + .resolver(getAddressResolverGroup()) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) } @Override @@ -26,6 +30,7 @@ class ReactorNettyHttpClientTest extends AbstractReactorNettyHttpClientTest { .newConnection() .host(host) .port(port) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) return new SingleConnection() { diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy index 098c963125cb..db1a9879be4a 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/ReactorNettyHttpClientUsingFromTest.groovy @@ -6,14 +6,18 @@ package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0 import io.netty.channel.ChannelOption +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames import reactor.netty.http.client.HttpClient import reactor.netty.tcp.TcpClient class ReactorNettyHttpClientUsingFromTest extends AbstractReactorNettyHttpClientTest { HttpClient createHttpClient() { - return HttpClient.from(TcpClient.create()).tcpConfiguration({ tcpClient -> - tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) - }).resolver(getAddressResolverGroup()) + return HttpClient.from(TcpClient.create()) + .tcpConfiguration({ tcpClient -> + tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT_MS) + }) + .resolver(getAddressResolverGroup()) + .headers({ headers -> headers.set(HttpHeaderNames.USER_AGENT, userAgent()) }) } }