diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/HttpClientResponseTracingHandler.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/HttpClientResponseTracingHandler.java index 3087576aae50..29f06d4b0bd2 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/HttpClientResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/client/HttpClientResponseTracingHandler.java @@ -9,14 +9,20 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.Attribute; +import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.netty.v4_1.AttributeKeys; public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter { + private static final AttributeKey HTTP_RESPONSE = + AttributeKey.valueOf(HttpClientResponseTracingHandler.class, "http-response"); + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Context context = ctx.channel().attr(AttributeKeys.CLIENT_CONTEXT).get(); @@ -25,8 +31,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { return; } - if (msg instanceof HttpResponse) { + if (msg instanceof FullHttpResponse) { tracer().end(context, (HttpResponse) msg); + } else if (msg instanceof HttpResponse) { + // Headers before body have been received, store them to use when finishing the span. + ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg); + } else if (msg instanceof LastHttpContent) { + // Not a FullHttpResponse so this is content that has been received after headers. Finish the + // span using what we stored in attrs. + tracer().end(context, ctx.channel().attr(HTTP_RESPONSE).get()); } // We want the callback in the scope of the parent, not the client span diff --git a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/server/HttpServerResponseTracingHandler.java b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/server/HttpServerResponseTracingHandler.java index 256d49e2b9af..adf6d7be4414 100644 --- a/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/server/HttpServerResponseTracingHandler.java +++ b/instrumentation/netty/netty-4.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/v4_1/server/HttpServerResponseTracingHandler.java @@ -7,29 +7,73 @@ import static io.opentelemetry.javaagent.instrumentation.netty.v4_1.server.NettyHttpServerTracer.tracer; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.AttributeKey; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdapter { + private static final AttributeKey HTTP_RESPONSE = + AttributeKey.valueOf(HttpServerResponseTracingHandler.class, "http-response"); + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) { Context context = tracer().getServerContext(ctx.channel()); - if (context == null || !(msg instanceof HttpResponse)) { + if (context == null) { ctx.write(msg, prm); return; } + final ChannelPromise writePromise; + + if (msg instanceof LastHttpContent) { + if (prm.isVoid()) { + // Some frameworks don't actually listen for response completion and optimize for + // allocations by using a singleton, unnotifiable promise. Hopefully these frameworks don't + // have observability features or they'd be way off... + writePromise = ctx.newPromise(); + } else { + writePromise = prm; + } + // Going to finish the span after the write of the last content finishes. + if (msg instanceof FullHttpResponse) { + // Headers and body all sent together, we have the response information in the msg. + writePromise.addListener(future -> finish(context, writePromise, (FullHttpResponse) msg)); + } else { + // Body sent after headers. We stored the response information in the context when + // encountering HttpResponse (which was not FullHttpResponse since it's not + // LastHttpContent). + writePromise.addListener( + future -> finish(context, writePromise, ctx.channel().attr(HTTP_RESPONSE).get())); + } + } else { + writePromise = prm; + if (msg instanceof HttpResponse) { + // Headers before body has been sent, store them to use when finishing the span. + ctx.channel().attr(HTTP_RESPONSE).set((HttpResponse) msg); + } + } + try (Scope ignored = context.makeCurrent()) { - ctx.write(msg, prm); + ctx.write(msg, writePromise); } catch (Throwable throwable) { tracer().endExceptionally(context, throwable); throw throwable; } - tracer().end(context, (HttpResponse) msg); + } + + private static void finish(Context context, ChannelFuture future, HttpResponse response) { + if (future.isSuccess()) { + tracer().end(context, response); + } else { + tracer().endExceptionally(context, future.cause()); + } } }