From 38ff13963216f4edd32e9b2e10a0494fdf0d28b1 Mon Sep 17 00:00:00 2001 From: Mateusz Rzeszutek Date: Fri, 10 Dec 2021 11:14:08 +0100 Subject: [PATCH] Fix reactor-netty memory/connection leak --- .../HttpResponseReceiverInstrumenter.java | 44 ++++++++++--------- .../AbstractReactorNettyHttpClientTest.groovy | 26 +++++++++++ 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java index 4362a24e05da..bf83a43230a3 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/HttpResponseReceiverInstrumenter.java @@ -18,12 +18,10 @@ import javax.annotation.Nullable; import reactor.core.publisher.Mono; import reactor.netty.Connection; -import reactor.netty.ConnectionObserver; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientConfig; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; -import reactor.netty.http.client.HttpClientState; public final class HttpResponseReceiverInstrumenter { @@ -46,8 +44,9 @@ public static HttpClient.ResponseReceiver instrument(HttpClient.ResponseRecei client .mapConnect(new StartOperation(contextHolder, config)) .doOnRequest(new PropagateContext(contextHolder)) - .doOnRequestError(new EndOperationWithError(contextHolder, config)) - .observe(new EndOperation(contextHolder, config)); + .doOnRequestError(new EndOperationWithRequestError(contextHolder, config)) + .doOnResponseError(new EndOperationWithResponseError(contextHolder, config)) + .doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config)); // modified should always be an HttpClientFinalizer too if (modified instanceof HttpClient.ResponseReceiver) { @@ -122,12 +121,13 @@ public void accept(HttpClientRequest httpClientRequest, Connection connection) { } } - static final class EndOperationWithError implements BiConsumer { + static final class EndOperationWithRequestError + implements BiConsumer { private final ContextHolder contextHolder; private final HttpClientConfig config; - EndOperationWithError(ContextHolder contextHolder, HttpClientConfig config) { + EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) { this.contextHolder = contextHolder; this.config = config; } @@ -142,42 +142,44 @@ public void accept(HttpClientRequest httpClientRequest, Throwable error) { } } - static final class EndOperation implements ConnectionObserver { + static final class EndOperationWithResponseError + implements BiConsumer { private final ContextHolder contextHolder; private final HttpClientConfig config; - EndOperation(ContextHolder contextHolder, HttpClientConfig config) { + EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) { this.contextHolder = contextHolder; this.config = config; } @Override - public void onStateChange(Connection connection, State newState) { - if (newState != HttpClientState.RESPONSE_COMPLETED) { - return; - } - + public void accept(HttpClientResponse response, Throwable error) { Context context = contextHolder.context; if (context == null) { return; } + instrumenter().end(context, config, response, error); + } + } - // connection is actually an instance of HttpClientOperations - a package private class that - // implements both Connection and HttpClientResponse - if (connection instanceof HttpClientResponse) { - HttpClientResponse response = (HttpClientResponse) connection; - instrumenter().end(context, config, response, null); - } + static final class EndOperationWithSuccess implements BiConsumer { + + private final ContextHolder contextHolder; + private final HttpClientConfig config; + + EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) { + this.contextHolder = contextHolder; + this.config = config; } @Override - public void onUncaughtException(Connection connection, Throwable error) { + public void accept(HttpClientResponse response, Connection connection) { Context context = contextHolder.context; if (context == null) { return; } - instrumenter().end(context, config, null, error); + instrumenter().end(context, config, response, null); } } diff --git a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy index 3f3dc6247eb3..779685949edc 100644 --- a/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy +++ b/instrumentation/reactor-netty/reactor-netty-1.0/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/reactornetty/v1_0/AbstractReactorNettyHttpClientTest.groovy @@ -191,6 +191,32 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest() + def httpClient = createHttpClient() + .doOnConnect({ uniqueChannelHashes.add(it.channelHash())}) + def uri = "http://localhost:${server.httpPort()}/success" + + def count = 100 + + when: + (1..count).forEach({ + runWithSpan("parent") { + def status = httpClient.get().uri(uri) + .responseSingle { resp, content -> + // Make sure to consume content since that's when we close the span. + content.map { resp.status().code() } + }.block() + assert status == 200 + } + }) + + then: + traces.size() == count + uniqueChannelHashes.size() == 1 + } + static void assertSameSpan(SpanData expected, AtomicReference actual) { def expectedSpanContext = expected.spanContext def actualSpanContext = actual.get().spanContext