From bf3b27d84db8a4049f3833b15d1515f9730c9e92 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 30 Apr 2024 16:23:57 +0200 Subject: [PATCH 01/12] Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult Make sure the `.tasks` index is created before we starting testing task completion without storing its result. To achieve that, we store a fake task before we start `waitForCompletionTestCase`. Resolves #107823 --- .../admin/cluster/node/tasks/TasksIT.java | 75 +++++++++++-------- 1 file changed, 44 insertions(+), 31 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 8011be1d69a04..f9eb15ab09e15 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -562,6 +562,21 @@ public void testListTasksWaitForCompletion() throws Exception { } public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception { + // Need to make sure the .tasks index gets created, so let's add a fake task first + var latch = new CountDownLatch(1); + internalCluster().getInstance(TaskResultsService.class).storeResult(new TaskResult(true, fakeTask()), new ActionListener() { + @Override + public void onResponse(Void response) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }); + safeAwait(latch); + waitForCompletionTestCase(false, id -> clusterAdmin().prepareGetTask(id).setWaitForCompletion(true).execute(), response -> { assertTrue(response.getTask().isCompleted()); // We didn't store the result so it won't come back when we wait @@ -844,40 +859,21 @@ public void testNodeNotFoundButTaskFound() throws Exception { // Save a fake task that looks like it is from a node that isn't part of the cluster CyclicBarrier b = new CyclicBarrier(2); TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class); - resultsService.storeResult( - new TaskResult( - new TaskInfo( - new TaskId("fake", 1), - "test", - "fake", - "test", - "", - null, - 0, - 0, - false, - false, - TaskId.EMPTY_TASK_ID, - Collections.emptyMap() - ), - new RuntimeException("test") - ), - new ActionListener() { - @Override - public void onResponse(Void response) { - try { - b.await(); - } catch (InterruptedException | BrokenBarrierException e) { - onFailure(e); - } + resultsService.storeResult(new TaskResult(fakeTask(), new RuntimeException("test")), new ActionListener() { + @Override + public void onResponse(Void response) { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); } + } - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); } - ); + }); b.await(); // Now we can find it! @@ -988,4 +984,21 @@ private GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException { assertNull(info.status()); // The test task doesn't have any status return response; } + + private static TaskInfo fakeTask() { + return new TaskInfo( + new TaskId("fake", 1), + "test", + "fake", + "test", + "", + null, + 0, + 0, + false, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ); + } } From 800d56c4af94005c7ffdd4edfc101ca7ebdb4bb2 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 21 May 2024 01:19:59 +0200 Subject: [PATCH 02/12] Unblock request only after we started waiting for completion --- .../admin/cluster/node/tasks/TasksIT.java | 49 +++++++++---------- .../tasks/get/TransportGetTaskAction.java | 14 ++++-- .../test/tasks/MockTaskManager.java | 13 +++++ .../test/tasks/MockTaskManagerListener.java | 3 ++ 4 files changed, 49 insertions(+), 30 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index f9eb15ab09e15..0d6406bd7282b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -41,6 +41,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; @@ -563,20 +564,6 @@ public void testListTasksWaitForCompletion() throws Exception { public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception { // Need to make sure the .tasks index gets created, so let's add a fake task first - var latch = new CountDownLatch(1); - internalCluster().getInstance(TaskResultsService.class).storeResult(new TaskResult(true, fakeTask()), new ActionListener() { - @Override - public void onResponse(Void response) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); - } - }); - safeAwait(latch); - waitForCompletionTestCase(false, id -> clusterAdmin().prepareGetTask(id).setWaitForCompletion(true).execute(), response -> { assertTrue(response.getTask().isCompleted()); // We didn't store the result so it won't come back when we wait @@ -614,19 +601,27 @@ private void waitForCompletionTestCase(boolean storeResult, Function client().execute(TEST_TASK_ACTION, request) ); - ActionFuture waitResponseFuture; - try { - var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); - assertThat(tasks, hasSize(1)); - var taskId = tasks.get(0).taskId(); - clusterAdmin().prepareGetTask(taskId).get(); - - // Spin up a request to wait for the test task to finish - waitResponseFuture = wait.apply(taskId); - } finally { - // Unblock the request so the wait for completion request can finish - client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get(); - } + var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks(); + assertThat(tasks, hasSize(1)); + TaskId taskId = tasks.get(0).taskId(); + clusterAdmin().prepareGetTask(taskId).get(); + + // Spin up a request to wait for the test task to finish + ActionFuture waitResponseFuture = wait.apply(taskId); + + var taskManager = (MockTaskManager) internalCluster().getInstance( + TransportService.class, + clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName() + ).getTaskManager(); + taskManager.addListener(new MockTaskManagerListener() { + @Override + public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) { + // Unblock the request so the wait for completion request can finish + if (removedTaskListener.toString().startsWith("Completing running task Task{id=" + taskId.getId())) { + client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()); + } + } + }); // Now that the task is unblocked the list response will come back T waitResponse = waitResponseFuture.get(); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index c8b33e6d569d2..4afd320197a6a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene } else { if (request.getWaitForCompletion()) { final ListenableActionFuture future = new ListenableActionFuture<>(); - RemovedTaskListener removedTaskListener = task -> { - if (task.equals(runningTask)) { - future.onResponse(null); + RemovedTaskListener removedTaskListener = new RemovedTaskListener() { + @Override + public void onRemoved(Task task) { + if (task.equals(runningTask)) { + future.onResponse(null); + } + } + + @Override + public String toString() { + return "Completing running task " + runningTask; } }; taskManager.registerRemovedTaskListener(removedTaskListener); diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java index be2c0c332e41c..599868ab7f1f9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManager.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskManager; @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) { public void removeListener(MockTaskManagerListener listener) { listeners.remove(listener); } + + @Override + public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) { + super.registerRemovedTaskListener(removedTaskListener); + for (MockTaskManagerListener listener : listeners) { + try { + listener.onRemovedTaskListenerRegistered(removedTaskListener); + } catch (Exception e) { + logger.warn("failed to notify task manager listener about a registered removed task listener", e); + } + } + } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java index 4bebfae914219..1f915f26db70f 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java +++ b/test/framework/src/main/java/org/elasticsearch/test/tasks/MockTaskManagerListener.java @@ -8,6 +8,7 @@ package org.elasticsearch.test.tasks; +import org.elasticsearch.tasks.RemovedTaskListener; import org.elasticsearch.tasks.Task; /** @@ -17,4 +18,6 @@ public interface MockTaskManagerListener { default void onTaskRegistered(Task task) {}; default void onTaskUnregistered(Task task) {}; + + default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {}; } From 63396ac4b0bc300863349db13c4f05a398ed705e Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 21 May 2024 01:21:01 +0200 Subject: [PATCH 03/12] Update comment --- .../elasticsearch/action/admin/cluster/node/tasks/TasksIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 0d6406bd7282b..6623874417fbb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -616,7 +616,7 @@ private void waitForCompletionTestCase(boolean storeResult, Function() { - @Override - public void onResponse(Void response) { - try { - b.await(); - } catch (InterruptedException | BrokenBarrierException e) { - onFailure(e); + resultsService.storeResult( + new TaskResult( + new TaskInfo( + new TaskId("fake", 1), + "test", + "fake", + "test", + "", + null, + 0, + 0, + false, + false, + TaskId.EMPTY_TASK_ID, + Collections.emptyMap() + ), + new RuntimeException("test") + ), + new ActionListener() { + @Override + public void onResponse(Void response) { + try { + b.await(); + } catch (InterruptedException | BrokenBarrierException e) { + onFailure(e); + } } - } - @Override - public void onFailure(Exception e) { - throw new RuntimeException(e); + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } } - }); + ); b.await(); // Now we can find it! @@ -978,21 +997,4 @@ private GetTaskResponse expectFinishedTask(TaskId taskId) throws IOException { assertNull(info.status()); // The test task doesn't have any status return response; } - - private static TaskInfo fakeTask() { - return new TaskInfo( - new TaskId("fake", 1), - "test", - "fake", - "test", - "", - null, - 0, - 0, - false, - false, - TaskId.EMPTY_TASK_ID, - Collections.emptyMap() - ); - } } From f59ff4ee30b1aac402b024d3b70f05e4ae7b53ee Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 21 May 2024 09:47:22 +0200 Subject: [PATCH 06/12] Make sure we register onRemovedTaskListenerRegistered before we wait for completion --- .../action/admin/cluster/node/tasks/TasksIT.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index a49813fb709d5..4a912e79cb9e1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -605,9 +605,6 @@ private void waitForCompletionTestCase(boolean storeResult, Function waitResponseFuture = wait.apply(taskId); - var taskManager = (MockTaskManager) internalCluster().getInstance( TransportService.class, clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName() @@ -615,15 +612,15 @@ private void waitForCompletionTestCase(boolean storeResult, Function Date: Tue, 28 May 2024 10:28:48 +0200 Subject: [PATCH 08/12] Adjust test for the new task name --- .../elasticsearch/action/admin/cluster/node/tasks/TasksIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 4a912e79cb9e1..4ce9073200a5f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -613,7 +613,7 @@ private void waitForCompletionTestCase(boolean storeResult, Function { + boolean unblocked = waitUntil(() -> { if (((CancellableTask) task).isCancelled()) { throw new RuntimeException("Cancelled!"); } return ((TestTask) task).isBlocked() == false; }); + assert unblocked : task + " hasn't been unblocked"; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } From 7e6f7ded54d9f16b2d6750b67232b81d35feea58 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 28 May 2024 15:05:59 +0200 Subject: [PATCH 11/12] Revert "Make sure the task gets unblocked" This reverts commit f235b87960bcedb24a86857df2f1fff16b0061ef. --- .../action/admin/cluster/node/tasks/TestTaskPlugin.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 2b25a9ab59ce1..63629e16974d5 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -300,13 +300,12 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) { logger.info("Test task started on the node {}", clusterService.localNode()); if (request.shouldBlock) { try { - boolean unblocked = waitUntil(() -> { + waitUntil(() -> { if (((CancellableTask) task).isCancelled()) { throw new RuntimeException("Cancelled!"); } return ((TestTask) task).isBlocked() == false; }); - assert unblocked : task + " hasn't been unblocked"; } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } From 26170ea0a19ef3a6704db2b9351f4ee5d733a5c7 Mon Sep 17 00:00:00 2001 From: Artem Prigoda Date: Tue, 28 May 2024 17:20:01 +0200 Subject: [PATCH 12/12] Remove listener after test finished --- .../admin/cluster/node/tasks/TasksIT.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 0fcacacbf51f0..5ea1b869f417e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -609,17 +609,22 @@ private void waitForCompletionTestCase(boolean storeResult, Function