Skip to content

Commit

Permalink
Small tidy-up in TaskCancellationService (#92795)
Browse files Browse the repository at this point in the history
We use a fixed-size `CountDownActionListener` here to wait for each of 3
actions to complete, but two of the three actions cannot fail and the
result completes a `StepListener` so we can just use a
`RefCountingRunnable` instead.

Also we can just use `StepListener#addListener` a little further down
instead of passing the same runnable in two different arguments.
  • Loading branch information
DaveCTurner authored Jan 11, 2023
1 parent 8477b4e commit f77ce8b
Showing 1 changed file with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.CountDownActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -100,30 +101,42 @@ void doCancelTaskAndDescendants(CancellableTask task, String reason, boolean wai
if (task.shouldCancelChildrenOnCancellation()) {
logger.trace("cancelling task [{}] and its descendants", taskId);
StepListener<Void> completedListener = new StepListener<>();
CountDownActionListener countDownListener = new CountDownActionListener(3, completedListener);
Collection<Transport.Connection> childConnections = taskManager.startBanOnChildTasks(task.getId(), reason, () -> {
logger.trace("child tasks of parent [{}] are completed", taskId);
countDownListener.onResponse(null);
});
taskManager.cancel(task, reason, () -> {
logger.trace("task [{}] is cancelled", taskId);
countDownListener.onResponse(null);
});
StepListener<Void> setBanListener = new StepListener<>();

Collection<Transport.Connection> childConnections;
try (var refs = new RefCountingRunnable(() -> setBanListener.addListener(completedListener))) {
var banChildrenRef = refs.acquire();
var cancelTaskRef = refs.acquire();

childConnections = taskManager.startBanOnChildTasks(task.getId(), reason, () -> {
logger.trace("child tasks of parent [{}] are completed", taskId);
banChildrenRef.close();
});

taskManager.cancel(task, reason, () -> {
logger.trace("task [{}] is cancelled", taskId);
cancelTaskRef.close();
});
}
setBanOnChildConnections(reason, waitForCompletion, task, childConnections, setBanListener);
setBanListener.addListener(countDownListener);
// If we start unbanning when the last child task completed and that child task executed with a specific user, then unban
// requests are denied because internal requests can't run with a user. We need to remove bans with the current thread context.
final Runnable removeBansRunnable = transportService.getThreadPool()
.getThreadContext()
.preserveContext(() -> removeBanOnChildConnections(task, childConnections));

// We remove bans after all child tasks are completed although in theory we can do it on a per-connection basis.
completedListener.whenComplete(r -> removeBansRunnable.run(), e -> removeBansRunnable.run());
// if wait_for_completion is true, then only return when (1) bans are placed on child connections, (2) child tasks are
// completed or failed, (3) the main task is cancelled. Otherwise, return after bans are placed on child connections.
completedListener.addListener(
ActionListener.wrap(
transportService.getThreadPool()
.getThreadContext()
// If we start unbanning when the last child task completed and that child task executed with a specific user, then
// unban requests are denied because internal requests can't run with a user. We need to remove bans with the
// current thread context.
.preserveContext(() -> removeBanOnChildConnections(task, childConnections))
)
);

if (waitForCompletion) {
// Wait until (1) bans are placed on child connections, (2) child tasks are completed or failed, (3) main task is cancelled.
completedListener.addListener(listener);
} else {
// Only wait until bans are placed on child connections
setBanListener.addListener(listener);
}
} else {
Expand Down

0 comments on commit f77ce8b

Please sign in to comment.