From e622101ac97496281f8bf87e537e1e0b4db30b9a Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Wed, 29 May 2024 09:15:02 +0200 Subject: [PATCH] Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult (#108094) It seems that the failure (the missed index) has always existed in the test scenario and it's supposed to be handled by TransportGetTaskAction.java. We catch IndexNotFoundException here and convert it to ResourceNotFoundException. Then we catch ResourceNotFoundException here and return a snapshot of a task as a response. In the stack trace, getFinishedTaskFromIndex was called from getRunningTaskFromNode, not from waitedForCompletion due to a race between creating a get request and unblocking request which are sent asynchronously. I've changed the waitForCompletionTestCase test method to unblock the task only after the request started waiting for the task completion by registering a removal listener. By doing so, we make sure we test the "wait for completion" branch when task is running. The part about the missed index seems to irrelevant, since waitedForCompletion is able to suppress the error and return a snapshot of running task which is not possible if getFinishedTaskFromIndex gets called directly from getRunningTaskFromNode. Resolves #107823 --- .../admin/cluster/node/tasks/TasksIT.java | 35 ++++++++++++------- .../tasks/get/TransportGetTaskAction.java | 14 ++++++-- .../test/tasks/MockTaskManager.java | 13 +++++++ .../test/tasks/MockTaskManagerListener.java | 3 ++ 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 8011be1d69a04..5ea1b869f417e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -41,6 +41,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; @@ -599,24 +600,32 @@ private void waitForCompletionTestCase(boolean storeResult, Function client().execute(TEST_TASK_ACTION, request) ); - ActionFuture waitResponseFuture; + var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); + assertThat(tasks, hasSize(1)); + TaskId taskId = tasks.get(0).taskId(); + clusterAdmin().prepareGetTask(taskId).get(); + + var taskManager = (MockTaskManager) internalCluster().getInstance( + TransportService.class, + clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName() + ).getTaskManager(); + var listener = new MockTaskManagerListener() { + @Override + public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) { + // Unblock the request only after it started waiting for task completion + client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()); + } + }; + taskManager.addListener(listener); try { - var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); - assertThat(tasks, hasSize(1)); - var taskId = tasks.get(0).taskId(); - clusterAdmin().prepareGetTask(taskId).get(); - // Spin up a request to wait for the test task to finish - waitResponseFuture = wait.apply(taskId); + // The task will be unblocked as soon as the request started waiting for task completion + T waitResponse = wait.apply(taskId).get(); + validator.accept(waitResponse); } finally { - // Unblock the request so the wait for completion request can finish - client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get(); + taskManager.removeListener(listener); } - // Now that the task is unblocked the list response will come back - T waitResponse = waitResponseFuture.get(); - validator.accept(waitResponse); - TestTaskPlugin.NodesResponse response = future.get(); assertEquals(emptyList(), response.failures()); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index c8b33e6d569d2..85de3c65c798e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene } else { if (request.getWaitForCompletion()) { final ListenableActionFuture future = new ListenableActionFuture<>(); - RemovedTaskListener removedTaskListener = task -> { - if (task.equals(runningTask)) { - future.onResponse(null); + RemovedTaskListener removedTaskListener = new RemovedTaskListener() { + @Override + public void onRemoved(Task task) { + if (task.equals(runningTask)) { + future.onResponse(null); + } + } + + @Override + public String toString() { + return "Waiting for task completion " + runningTask; } }; taskManager.registerRemovedTaskListener(removedTaskListener); diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index be2c0c332e41c..599868ab7f1f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) { public void removeListener(MockTaskManagerListener listener) { listeners.remove(listener); } + + @Override + public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) { + super.registerRemovedTaskListener(removedTaskListener); + for (MockTaskManagerListener listener : listeners) { + try { + listener.onRemovedTaskListenerRegistered(removedTaskListener); + } catch (Exception e) { + logger.warn("failed to notify task manager listener about a registered removed task listener", e); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java index 4bebfae914219..1f915f26db70f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java @@ -8,6 +8,7 @@ package org.elasticsearch.test.tasks; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; /** @@ -17,4 +18,6 @@ public interface MockTaskManagerListener { default void onTaskRegistered(Task task) {}; default void onTaskUnregistered(Task task) {}; + + default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {}; }