Skip to content

Commit

Permalink
Preserve thread context when calling getNextPart() (#109519)
Browse files Browse the repository at this point in the history
The listener supplied to `getNextPart()` may be completed in a
non-default thread context, and this trips assertions about thread
context pollution in the transport layer, so this commit adds the
missing protection against that.

Also the listener supplied to `getNextPart()` may be completed
immediately on the transport thread, which could lead to a stack
overflow, so this commit adds another `execute()` call to
unconditionally fork a fresh task.

Relates #104851
  • Loading branch information
DaveCTurner authored Jun 17, 2024
1 parent c983b3f commit 0a008ed
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ public void onFailure(Exception exception) {
safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing
}
cancellable.cancel();
} // closing the request tracker ensures that everything is released, including all response chunks and the overall response
} // closing the resource tracker ensures that everything is released, including all response chunks and the overall response
}

private static Releasable withResourceTracker() {
Expand Down Expand Up @@ -525,6 +525,7 @@ public ActionRequestValidationException validate() {
public static class Response extends ActionResponse {
private final Executor executor;
volatile boolean computingContinuation;
boolean recursive = false;

public Response(Executor executor) {
this.executor = executor;
Expand All @@ -551,11 +552,17 @@ public boolean isLastPart() {

@Override
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getResponseBodyPart();
}));
assertFalse(recursive);
recursive = true;
try {
computingContinuation = true;
executor.execute(ActionRunnable.supply(listener, () -> {
computingContinuation = false;
return getResponseBodyPart();
}));
} finally {
recursive = false;
}
}

@Override
Expand Down Expand Up @@ -585,7 +592,10 @@ public TransportInfiniteContinuationsAction(ActionFilters actionFilters, Transpo
@Override
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
executor.execute(
ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor))
ActionRunnable.supply(
ActionTestUtils.assertNoFailureListener(listener::onResponse),
() -> new Response(randomFrom(executor, EsExecutors.DIRECT_EXECUTOR_SERVICE))
)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.network.ThreadWatchdog;
Expand Down Expand Up @@ -271,45 +272,59 @@ private void finishChunkedWrite() {
writeSequence++;
finishingWrite.combiner().finish(finishingWrite.onDone());
} else {
final var threadContext = serverTransport.getThreadPool().getThreadContext();
assert Transports.assertDefaultThreadContext(threadContext);
final var channel = finishingWrite.onDone().channel();
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBodyPart continuation) {
channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
);
checkShutdown();
}

@Override
public void onFailure(Exception e) {
logger.error(
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
}

private void checkShutdown() {
if (channel.eventLoop().isShuttingDown()) {
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the
// preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so
// to avoid a leak we must double-check that the final listener is completed once the event loop is terminated.
// Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor
// which means this completion is not subject to the same issue, it still works even if the event loop has already
// terminated.
channel.eventLoop()
.terminationFuture()
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
}
}

}), finishingWriteBodyPart::getNextPart);
ActionListener.run(
new ContextPreservingActionListener<>(
threadContext.newRestorableContext(false),
ActionListener.assertOnce(new ActionListener<>() {
@Override
public void onResponse(ChunkedRestResponseBodyPart continuation) {
// always fork a fresh task to avoid stack overflow
assert Transports.assertDefaultThreadContext(threadContext);
channel.eventLoop()
.execute(
() -> channel.writeAndFlush(
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
finishingWrite.onDone() // pass the terminal listener/promise along the line
)
);
checkShutdown();
}

@Override
public void onFailure(Exception e) {
assert Transports.assertDefaultThreadContext(threadContext);
logger.error(
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
e
);
channel.close().addListener(ignored -> {
finishingWrite.combiner().add(channel.newFailedFuture(e));
finishingWrite.combiner().finish(finishingWrite.onDone());
});
checkShutdown();
}

private void checkShutdown() {
if (channel.eventLoop().isShuttingDown()) {
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know
// if the preceding activity made it onto its queue before shutdown or whether it will just vanish without a
// trace, so to avoid a leak we must double-check that the final listener is completed once the event loop
// is terminated. Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an
// ImmediateEventExecutor which means this completion is not subject to the same issue, it still works even
// if the event loop has already terminated.
channel.eventLoop()
.terminationFuture()
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
}
}

})
),
finishingWriteBodyPart::getNextPart
);
}
}

Expand Down

0 comments on commit 0a008ed

Please sign in to comment.