Skip to content

Commit

Permalink
[8.14] Fix TasksIT#testTasksCancellation (#109929) (#109941)
Browse files Browse the repository at this point in the history
* Fix `TasksIT#testTasksCancellation` (#109929)

The tasks are removed from the task manager _after_ sending the
response, so we cannot reliably assert they're done. With this commit we
wait for them to complete properly first.

Closes #109686

* Introduce safeGet
  • Loading branch information
DaveCTurner authored Jun 19, 2024
1 parent b1fadd8 commit 342a451
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ public void onTaskRegistered(Task task) {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/109686")
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
Expand All @@ -510,6 +509,9 @@ public void testTasksCancellation() throws Exception {

expectThrows(TaskCancelledException.class, future);

logger.info("--> waiting for all ongoing tasks to complete within a reasonable time");
safeGet(clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "*").setWaitForCompletion(true).execute());

logger.info("--> checking that test tasks are not running");
assertEquals(0, clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name() + "*").get().getTasks().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -2117,15 +2118,19 @@ public static void safeAcquire(Semaphore semaphore) {
public static <T> T safeAwait(SubscribableListener<T> listener) {
final var future = new PlainActionFuture<T>();
listener.addListener(future);
return safeGet(future);
}

public static <T> T safeGet(Future<T> future) {
try {
return future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("safeAwait: interrupted waiting for SubscribableListener", e);
throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e);
} catch (ExecutionException e) {
throw new AssertionError("safeAwait: listener was completed exceptionally", e);
throw new AssertionError("safeGet: listener was completed exceptionally", e);
} catch (TimeoutException e) {
throw new AssertionError("safeAwait: listener was not completed within the timeout", e);
throw new AssertionError("safeGet: listener was not completed within the timeout", e);
}
}

Expand Down

0 comments on commit 342a451

Please sign in to comment.