diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 8011be1d69a04..5ea1b869f417e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -41,6 +41,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; @@ -599,24 +600,32 @@ private void waitForCompletionTestCase(boolean storeResult, Function client().execute(TEST_TASK_ACTION, request) ); - ActionFuture waitResponseFuture; + var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); + assertThat(tasks, hasSize(1)); + TaskId taskId = tasks.get(0).taskId(); + clusterAdmin().prepareGetTask(taskId).get(); + + var taskManager = (MockTaskManager) internalCluster().getInstance( + TransportService.class, + clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName() + ).getTaskManager(); + var listener = new MockTaskManagerListener() { + @Override + public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) { + // Unblock the request only after it started waiting for task completion + client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()); + } + }; + taskManager.addListener(listener); try { - var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); - assertThat(tasks, hasSize(1)); - var taskId = tasks.get(0).taskId(); - clusterAdmin().prepareGetTask(taskId).get(); - // Spin up a request to wait for the test task to finish - waitResponseFuture = wait.apply(taskId); + // The task will be unblocked as soon as the request started waiting for task completion + T waitResponse = wait.apply(taskId).get(); + validator.accept(waitResponse); } finally { - // Unblock the request so the wait for completion request can finish - client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get(); + taskManager.removeListener(listener); } - // Now that the task is unblocked the list response will come back - T waitResponse = waitResponseFuture.get(); - validator.accept(waitResponse); - TestTaskPlugin.NodesResponse response = future.get(); assertEquals(emptyList(), response.failures()); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index c8b33e6d569d2..85de3c65c798e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene } else { if (request.getWaitForCompletion()) { final ListenableActionFuture future = new ListenableActionFuture<>(); - RemovedTaskListener removedTaskListener = task -> { - if (task.equals(runningTask)) { - future.onResponse(null); + RemovedTaskListener removedTaskListener = new RemovedTaskListener() { + @Override + public void onRemoved(Task task) { + if (task.equals(runningTask)) { + future.onResponse(null); + } + } + + @Override + public String toString() { + return "Waiting for task completion " + runningTask; } }; taskManager.registerRemovedTaskListener(removedTaskListener); diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index be2c0c332e41c..599868ab7f1f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) { public void removeListener(MockTaskManagerListener listener) { listeners.remove(listener); } + + @Override + public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) { + super.registerRemovedTaskListener(removedTaskListener); + for (MockTaskManagerListener listener : listeners) { + try { + listener.onRemovedTaskListenerRegistered(removedTaskListener); + } catch (Exception e) { + logger.warn("failed to notify task manager listener about a registered removed task listener", e); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java index 4bebfae914219..1f915f26db70f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java @@ -8,6 +8,7 @@ package org.elasticsearch.test.tasks; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; /** @@ -17,4 +18,6 @@ public interface MockTaskManagerListener { default void onTaskRegistered(Task task) {}; default void onTaskUnregistered(Task task) {}; + + default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {}; }