From 21fb5afe67f7a9f5159676d855de7f137f87cb58 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 1 Jul 2024 06:54:04 +0100 Subject: [PATCH] Improve refcounting in `testClientCancellation` (#110309) With the changes in #109519 we now do one more async step while serving the response, so we need to acquire another ref to track the new step. Relates #109866 Relates #110118 Relates #110175 Relates #110249 --- .../netty4/Netty4ChunkedContinuationsIT.java | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java index 4b6c820638b40..c4c35b410af78 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java @@ -72,10 +72,8 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.test.MockLog; -import org.elasticsearch.test.ReachabilityChecker; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.LeakTracker; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.netty4.Netty4Utils; import org.elasticsearch.xcontent.ToXContentObject; @@ -317,20 +315,14 @@ public void onFailure(Exception exception) { private static Releasable withResourceTracker() { assertNull(refs); - final ReachabilityChecker reachabilityChecker = new ReachabilityChecker(); final var latch = new CountDownLatch(1); - refs = LeakTracker.wrap(reachabilityChecker.register(AbstractRefCounted.of(latch::countDown))); + refs = AbstractRefCounted.of(latch::countDown); return () -> { refs.decRef(); - boolean success = false; try { safeAwait(latch); - success = true; } finally { refs = null; - if (success == false) { - reachabilityChecker.ensureUnreachable(); - } } }; } @@ -643,14 +635,11 @@ public void close() { @Override public void accept(RestChannel channel) { - client.execute(TYPE, new Request(), new RestActionListener<>(channel) { + localRefs.mustIncRef(); + client.execute(TYPE, new Request(), ActionListener.releaseAfter(new RestActionListener<>(channel) { @Override protected void processResponse(Response response) { - // incRef can fail if the request was already cancelled - if (localRefs.tryIncRef() == false) { - assert localRefs.hasReferences() == false : "tryIncRef failed but RefCounted not completed"; - return; - } + localRefs.mustIncRef(); channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getResponseBodyPart(), () -> { // cancellation notification only happens while processing a continuation, not while computing // the next one; prompt cancellation requires use of something like RestCancellableNodeClient @@ -659,7 +648,10 @@ protected void processResponse(Response response) { localRefs.decRef(); })); } - }); + }, () -> { + assertSame(localRefs, refs); + localRefs.decRef(); + })); } }; } else {