Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish Netty 4.1 spans after response has completed not when it started. #2641

Merged
merged 5 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,20 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys;

public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {

private static final AttributeKey<HttpResponse> HTTP_RESPONSE =
AttributeKey.valueOf(HttpClientResponseTracingHandler.class, "HTTP_RESPONSE");
anuraaga marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Context context = ctx.channel().attr(AttributeKeys.CLIENT_CONTEXT).get();
Expand All @@ -25,8 +31,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
return;
}

if (msg instanceof HttpResponse) {
if (msg instanceof FullHttpResponse) {
tracer().end(context, (HttpResponse) msg);
} else if (msg instanceof HttpResponse) {
// Headers before body have been received, store them to use when finishing the span.
ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg);
} else if (msg instanceof LastHttpContent) {
// Not a FullHttpResponse so this is content that has been received after headers. Finish the
// span using what we stored in attrs.
tracer().end(context, ctx.channel().attr(HTTP_RESPONSE).get());
}

// We want the callback in the scope of the parent, not the client span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,73 @@

import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.server.NettyHttpServerTracer.tracer;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;

public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {

private static final AttributeKey<HttpResponse> HTTP_RESPONSE =
AttributeKey.valueOf(HttpServerResponseTracingHandler.class, "HTTP_RESPONSE");
anuraaga marked this conversation as resolved.
Show resolved Hide resolved

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
Context context = tracer().getServerContext(ctx.channel());
if (context == null || !(msg instanceof HttpResponse)) {
if (context == null) {
ctx.write(msg, prm);
return;
}

final ChannelPromise writePromise;

if (msg instanceof LastHttpContent) {
if (prm == ctx.voidPromise()) {
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
// Some frameworks don't actually listen for response completion and optimize for
// allocations by using a singleton, unnotifiable promise. Hopefully these frameworks don't
// have observability features or they'd be way off...
writePromise = ctx.newPromise();
} else {
writePromise = prm;
}
// Going to finish the span after the write of the last content finishes.
if (msg instanceof FullHttpResponse) {
// Headers and body all sent together, we have the response information in the msg.
writePromise.addListener(future -> finish(context, writePromise, (FullHttpResponse) msg));
} else {
// Body sent after headers. We stored the response information in the context when
// encountering HttpResponse (which was not FullHttpResponse since it's not
// LastHttpContent).
writePromise.addListener(
future -> finish(context, writePromise, ctx.channel().attr(HTTP_RESPONSE).get()));
}
} else {
writePromise = prm;
if (msg instanceof HttpResponse) {
// Headers before body has been sent, store them to use when finishing the span.
ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg);
}
}

try (Scope ignored = context.makeCurrent()) {
ctx.write(msg, prm);
ctx.write(msg, writePromise);
} catch (Throwable throwable) {
tracer().endExceptionally(context, throwable);
throw throwable;
}
tracer().end(context, (HttpResponse) msg);
}

private static void finish(Context context, ChannelFuture future, HttpResponse response) {
if (future.isSuccess()) {
tracer().end(context, response);
} else {
tracer().endExceptionally(context, future.cause());
}
}
}