Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix netty strict context checks #4002

Merged
merged 3 commits into from
Aug 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,24 @@ public final class FutureListenerWrappers {
GenericFutureListener<? extends Future<?>>, GenericFutureListener<? extends Future<?>>>
wrappers = Cache.newBuilder().setWeakKeys().setWeakValues().build();

private static final ClassValue<Boolean> shouldWrap =
new ClassValue<Boolean>() {
@Override
protected Boolean computeValue(Class<?> type) {
// we only want to wrap user callbacks
String className = type.getName();
return !className.startsWith("io.opentelemetry.javaagent.")
&& !className.startsWith("io.netty.");
}
};

public static boolean shouldWrap(GenericFutureListener<? extends Future<?>> listener) {
return shouldWrap.get(listener.getClass());
}

@SuppressWarnings("unchecked")
public static GenericFutureListener<?> wrap(
Context context, GenericFutureListener<? extends Future<?>> delegate) {
if (delegate instanceof WrappedFutureListener
|| delegate instanceof WrappedProgressiveFutureListener) {
return delegate;
}
return wrappers.computeIfAbsent(
delegate,
key -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public static class AddListenerAdvice {
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<?>> listener) {
listener = FutureListenerWrappers.wrap(Java8BytecodeBridge.currentContext(), listener);
if (FutureListenerWrappers.shouldWrap(listener)) {
listener = FutureListenerWrappers.wrap(Java8BytecodeBridge.currentContext(), listener);
}
}
}

Expand All @@ -78,7 +80,9 @@ public static void wrapListener(
GenericFutureListener<? extends Future<?>>[] wrappedListeners =
new GenericFutureListener[listeners.length];
for (int i = 0; i < listeners.length; ++i) {
wrappedListeners[i] = FutureListenerWrappers.wrap(context, listeners[i]);
if (FutureListenerWrappers.shouldWrap(listeners[i])) {
wrappedListeners[i] = FutureListenerWrappers.wrap(context, listeners[i]);
}
}
listeners = wrappedListeners;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,63 @@

package io.opentelemetry.javaagent.instrumentation.netty.v4_0.client;

import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys.attributeKey;
import static io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.NettyHttpClientTracer.tracer;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;

public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {

private static final AttributeKey<HttpResponse> HTTP_RESPONSE =
attributeKey(HttpClientResponseTracingHandler.class + ".http-response");

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Context context = ctx.channel().attr(AttributeKeys.CLIENT_CONTEXT).get();
Attribute<Context> clientContextAttr = ctx.channel().attr(AttributeKeys.CLIENT_CONTEXT);
Context context = clientContextAttr.get();
if (context == null) {
ctx.fireChannelRead(msg);
return;
}

if (msg instanceof HttpResponse) {
tracer().end(context, (HttpResponse) msg);
Attribute<Context> parentContextAttr = ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT);
Context parentContext = parentContextAttr.get();

if (msg instanceof FullHttpResponse) {
clientContextAttr.remove();
parentContextAttr.remove();
} else if (msg instanceof HttpResponse) {
// Headers before body have been received, store them to use when finishing the span.
ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg);
} else if (msg instanceof LastHttpContent) {
// Not a FullHttpResponse so this is content that has been received after headers. Finish the
// span using what we stored in attrs.
clientContextAttr.remove();
parentContextAttr.remove();
}

// We want the callback in the scope of the parent, not the client span
Attribute<Context> parentAttr = ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT);
Context parentContext = parentAttr.get();
if (parentContext != null) {
try (Scope ignored = parentContext.makeCurrent()) {
ctx.fireChannelRead(msg);
}
} else {
ctx.fireChannelRead(msg);
}

if (msg instanceof FullHttpResponse) {
tracer().end(context, (HttpResponse) msg);
} else if (msg instanceof LastHttpContent) {
tracer().end(context, ctx.channel().attr(HTTP_RESPONSE).getAndRemove());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
Context parentContext = parentContextAttr.get();

if (msg instanceof FullHttpResponse) {
tracer().end(context, (HttpResponse) msg);
clientContextAttr.remove();
parentContextAttr.remove();
} else if (msg instanceof HttpResponse) {
Expand All @@ -45,7 +44,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
} else if (msg instanceof LastHttpContent) {
// Not a FullHttpResponse so this is content that has been received after headers. Finish the
// span using what we stored in attrs.
tracer().end(context, ctx.channel().attr(HTTP_RESPONSE).getAndRemove());
clientContextAttr.remove();
parentContextAttr.remove();
}
Expand All @@ -58,5 +56,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
} else {
ctx.fireChannelRead(msg);
}

if (msg instanceof FullHttpResponse) {
tracer().end(context, (HttpResponse) msg);
} else if (msg instanceof LastHttpContent) {
tracer().end(context, ctx.channel().attr(HTTP_RESPONSE).getAndRemove());
}
}
}