Skip to content

Commit

Permalink
Improve refcounting in testClientCancellation (#110309)
Browse files Browse the repository at this point in the history
With the changes in #109519 we now do one more async step while serving
the response, so we need to acquire another ref to track the new step.

Relates #109866
Relates #110118
Relates #110175
Relates #110249
  • Loading branch information
DaveCTurner authored Jul 1, 2024
1 parent b906ce3 commit 21fb5af
Showing 1 changed file with 8 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.ReachabilityChecker;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.LeakTracker;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -317,20 +315,14 @@ public void onFailure(Exception exception) {

private static Releasable withResourceTracker() {
assertNull(refs);
final ReachabilityChecker reachabilityChecker = new ReachabilityChecker();
final var latch = new CountDownLatch(1);
refs = LeakTracker.wrap(reachabilityChecker.register(AbstractRefCounted.of(latch::countDown)));
refs = AbstractRefCounted.of(latch::countDown);
return () -> {
refs.decRef();
boolean success = false;
try {
safeAwait(latch);
success = true;
} finally {
refs = null;
if (success == false) {
reachabilityChecker.ensureUnreachable();
}
}
};
}
Expand Down Expand Up @@ -643,14 +635,11 @@ public void close() {

@Override
public void accept(RestChannel channel) {
client.execute(TYPE, new Request(), new RestActionListener<>(channel) {
localRefs.mustIncRef();
client.execute(TYPE, new Request(), ActionListener.releaseAfter(new RestActionListener<>(channel) {
@Override
protected void processResponse(Response response) {
// incRef can fail if the request was already cancelled
if (localRefs.tryIncRef() == false) {
assert localRefs.hasReferences() == false : "tryIncRef failed but RefCounted not completed";
return;
}
localRefs.mustIncRef();
channel.sendResponse(RestResponse.chunked(RestStatus.OK, response.getResponseBodyPart(), () -> {
// cancellation notification only happens while processing a continuation, not while computing
// the next one; prompt cancellation requires use of something like RestCancellableNodeClient
Expand All @@ -659,7 +648,10 @@ protected void processResponse(Response response) {
localRefs.decRef();
}));
}
});
}, () -> {
assertSame(localRefs, refs);
localRefs.decRef();
}));
}
};
} else {
Expand Down

0 comments on commit 21fb5af

Please sign in to comment.