Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a dedicated reactor-netty 1.0 instrumentation #4662

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void resetOnEachOperator() {
}

/** Forces Mono to run in traceContext scope. */
static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingContext) {
if (!enabled) {
return publisher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ dependencies {
annotationProcessor("com.google.auto.value:auto-value")

implementation(project(":instrumentation:netty:netty-4.1-common:javaagent"))
implementation(project(":instrumentation:reactor-3.1:library"))

library("io.projectreactor.netty:reactor-netty-http:1.0.0")

testInstrumentation(project(":instrumentation:reactor-netty:reactor-netty-0.9:javaagent"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import io.netty.channel.Channel;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.netty.v4_1.AttributeKeys;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import reactor.netty.Connection;
Expand All @@ -24,15 +22,19 @@ public static boolean shouldDecorate(Class<?> callbackClass) {

public static final class OnMessageDecorator<M extends HttpClientInfos>
implements BiConsumer<M, Connection> {

private final BiConsumer<? super M, ? super Connection> delegate;
private final PropagatedContext propagatedContext;

public OnMessageDecorator(BiConsumer<? super M, ? super Connection> delegate) {
public OnMessageDecorator(
BiConsumer<? super M, ? super Connection> delegate, PropagatedContext propagatedContext) {
this.delegate = delegate;
this.propagatedContext = propagatedContext;
}

@Override
public void accept(M message, Connection connection) {
Context context = getChannelContext(message.currentContextView(), connection.channel());
Context context = getChannelContext(message.currentContextView(), propagatedContext);
if (context == null) {
delegate.accept(message, connection);
} else {
Expand All @@ -45,15 +47,19 @@ public void accept(M message, Connection connection) {

public static final class OnMessageErrorDecorator<M extends HttpClientInfos>
implements BiConsumer<M, Throwable> {

private final BiConsumer<? super M, ? super Throwable> delegate;
private final PropagatedContext propagatedContext;

public OnMessageErrorDecorator(BiConsumer<? super M, ? super Throwable> delegate) {
public OnMessageErrorDecorator(
BiConsumer<? super M, ? super Throwable> delegate, PropagatedContext propagatedContext) {
this.delegate = delegate;
this.propagatedContext = propagatedContext;
}

@Override
public void accept(M message, Throwable throwable) {
Context context = getChannelContext(message.currentContextView(), null);
Context context = getChannelContext(message.currentContextView(), propagatedContext);
if (context == null) {
delegate.accept(message, throwable);
} else {
Expand All @@ -65,16 +71,27 @@ public void accept(M message, Throwable throwable) {
}

@Nullable
private static Context getChannelContext(ContextView contextView, @Nullable Channel channel) {
// try to get the client span context from the channel if it's available
if (channel != null) {
Context context = channel.attr(AttributeKeys.CLIENT_CONTEXT).get();
if (context != null) {
return context;
}
private static Context getChannelContext(
ContextView contextView, PropagatedContext propagatedContext) {
Context context = null;
if (propagatedContext.useClientContext) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_CONTEXT_KEY, null);
}
if (context == null) {
context = contextView.getOrDefault(ReactorContextKeys.CLIENT_PARENT_CONTEXT_KEY, null);
}
Comment on lines +77 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the fallback in case of useClientContext important? (vs straight if/else here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is -- when reactor-netty span is suppressed and client context is not accessible from the reactor's ContextView we still want to execute the doOn*() callbacks with the parent context in scope.

return context;
}

public enum PropagatedContext {
PARENT(false),
CLIENT(true);

final boolean useClientContext;

PropagatedContext(boolean useClientContext) {
this.useClientContext = useClientContext;
}
// otherwise use the parent span context
return contextView.getOrDefault(MapConnect.CONTEXT_ATTRIBUTE, null);
}

private DecoratorFunctions() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,19 @@
package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
import io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0.DecoratorFunctions.PropagatedContext;
import java.util.function.BiConsumer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import reactor.netty.Connection;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;

Expand All @@ -32,10 +30,6 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isStatic().and(namedOneOf("create", "newConnection", "from")),
this.getClass().getName() + "$CreateAdvice");

// advice classes below expose current context in doOn*/doAfter* callbacks
transformer.applyAdviceToMethod(
isPublic()
Expand Down Expand Up @@ -70,36 +64,22 @@ public void transform(TypeTransformer transformer) {
this.getClass().getName() + "$OnErrorAdvice");
}

@SuppressWarnings("unused")
public static class CreateAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Local("otelCallDepth") CallDepth callDepth) {
callDepth = CallDepth.forClass(HttpClient.class);
callDepth.getAndIncrement();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Return(readOnly = false) HttpClient client,
@Advice.Local("otelCallDepth") CallDepth callDepth) {

if (callDepth.decrementAndGet() == 0 && throwable == null) {
client = client.doOnRequest(new OnRequest()).mapConnect(new MapConnect());
}
}
}

@SuppressWarnings("unused")
public static class OnRequestAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientRequest, ? super Connection> callback) {
BiConsumer<? super HttpClientRequest, ? super Connection> callback,
@Advice.Origin("#m") String methodName) {

if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnMessageDecorator<>(callback);
// use client context after request is sent, parent context before that
PropagatedContext propagatedContext =
"doAfterRequest".equals(methodName)
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
? PropagatedContext.CLIENT
: PropagatedContext.PARENT;
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext);
}
}
}
Expand All @@ -111,8 +91,10 @@ public static class OnRequestErrorAdvice {
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientRequest, ? super Throwable> callback) {

if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback);
callback =
new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT);
}
}
}
Expand All @@ -123,9 +105,15 @@ public static class OnResponseAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Connection> callback) {
BiConsumer<? super HttpClientResponse, ? super Connection> callback,
@Advice.Origin("#m") String methodName) {

if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnMessageDecorator<>(callback);
// use client context just when response status & headers are received, the parent context
// after the response is completed
PropagatedContext propagatedContext =
"doOnResponse".equals(methodName) ? PropagatedContext.CLIENT : PropagatedContext.PARENT;
mateuszrzeszutek marked this conversation as resolved.
Show resolved Hide resolved
callback = new DecoratorFunctions.OnMessageDecorator<>(callback, propagatedContext);
}
}
}
Expand All @@ -137,8 +125,10 @@ public static class OnResponseErrorAdvice {
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Throwable> callback) {

if (DecoratorFunctions.shouldDecorate(callback.getClass())) {
callback = new DecoratorFunctions.OnMessageErrorDecorator<>(callback);
callback =
new DecoratorFunctions.OnMessageErrorDecorator<>(callback, PropagatedContext.PARENT);
}
}
}
Expand All @@ -152,11 +142,16 @@ public static void onEnter(
BiConsumer<? super HttpClientRequest, ? super Throwable> requestCallback,
@Advice.Argument(value = 1, readOnly = false)
BiConsumer<? super HttpClientResponse, ? super Throwable> responseCallback) {

if (DecoratorFunctions.shouldDecorate(requestCallback.getClass())) {
requestCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(requestCallback);
requestCallback =
new DecoratorFunctions.OnMessageErrorDecorator<>(
requestCallback, PropagatedContext.PARENT);
}
if (DecoratorFunctions.shouldDecorate(responseCallback.getClass())) {
responseCallback = new DecoratorFunctions.OnMessageErrorDecorator<>(responseCallback);
responseCallback =
new DecoratorFunctions.OnMessageErrorDecorator<>(
responseCallback, PropagatedContext.PARENT);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactornetty.v1_0;

import io.opentelemetry.context.propagation.TextMapSetter;
import javax.annotation.Nullable;
import reactor.netty.http.client.HttpClientRequest;

enum HttpClientRequestHeadersSetter implements TextMapSetter<HttpClientRequest> {
INSTANCE;

@Override
public void set(@Nullable HttpClientRequest request, String key, String value) {
request.header(key, value);
}
}
Loading