diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 1e51d91ed154e..fa71e95d9e5c0 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -487,7 +487,6 @@ public void onTaskRegistered(Task task) { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/109686") public void testTasksCancellation() throws Exception { // Start blocking test task // Get real client (the plugin is not registered on transport nodes) @@ -510,6 +509,9 @@ public void testTasksCancellation() throws Exception { expectThrows(TaskCancelledException.class, future); + logger.info("--> waiting for all ongoing tasks to complete within a reasonable time"); + safeGet(clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "*").setWaitForCompletion(true).execute()); + logger.info("--> checking that test tasks are not running"); assertEquals(0, clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "*").get().getTasks().size()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index f2b4030983db4..19d458d509c44 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -171,6 +171,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -2117,15 +2118,19 @@ public static void safeAcquire(Semaphore semaphore) { public static T safeAwait(SubscribableListener listener) { final var future = new PlainActionFuture(); listener.addListener(future); + return safeGet(future); + } + + public static T safeGet(Future future) { try { return future.get(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new AssertionError("safeAwait: interrupted waiting for SubscribableListener", e); + throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e); } catch (ExecutionException e) { - throw new AssertionError("safeAwait: listener was completed exceptionally", e); + throw new AssertionError("safeGet: listener was completed exceptionally", e); } catch (TimeoutException e) { - throw new AssertionError("safeAwait: listener was not completed within the timeout", e); + throw new AssertionError("safeGet: listener was not completed within the timeout", e); } }