Skip to content

Commit

Permalink
[8.14] Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult (#…
Browse files Browse the repository at this point in the history
…108094) (#110012)

Backport #108094 to 8.14

Resolves #106043
  • Loading branch information
arteam authored Jun 20, 2024
1 parent 36ca4c1 commit cc36b7f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -602,24 +603,32 @@ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId,
TEST_TASK_ACTION.name() + "[n]",
() -> client().execute(TEST_TASK_ACTION, request)
);
ActionFuture<T> waitResponseFuture;
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
TaskId taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

var taskManager = (MockTaskManager) internalCluster().getInstance(
TransportService.class,
clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName()
).getTaskManager();
var listener = new MockTaskManagerListener() {
@Override
public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {
// Unblock the request only after it started waiting for task completion
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest());
}
};
taskManager.addListener(listener);
try {
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
var taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
// The task will be unblocked as soon as the request started waiting for task completion
T waitResponse = wait.apply(taskId).get();
validator.accept(waitResponse);
} finally {
// Unblock the request so the wait for completion request can finish
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get();
taskManager.removeListener(listener);
}

// Now that the task is unblocked the list response will come back
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);

TestTaskPlugin.NodesResponse response = future.get();
assertEquals(emptyList(), response.failures());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
} else {
if (request.getWaitForCompletion()) {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
RemovedTaskListener removedTaskListener = new RemovedTaskListener() {
@Override
public void onRemoved(Task task) {
if (task.equals(runningTask)) {
future.onResponse(null);
}
}

@Override
public String toString() {
return "Waiting for task completion " + runningTask;
}
};
taskManager.registerRemovedTaskListener(removedTaskListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -90,4 +91,16 @@ public void addListener(MockTaskManagerListener listener) {
public void removeListener(MockTaskManagerListener listener) {
listeners.remove(listener);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
super.registerRemovedTaskListener(removedTaskListener);
for (MockTaskManagerListener listener : listeners) {
try {
listener.onRemovedTaskListenerRegistered(removedTaskListener);
} catch (Exception e) {
logger.warn("failed to notify task manager listener about a registered removed task listener", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.test.tasks;

import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;

/**
Expand All @@ -17,4 +18,6 @@ public interface MockTaskManagerListener {
default void onTaskRegistered(Task task) {};

default void onTaskUnregistered(Task task) {};

default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {};
}

0 comments on commit cc36b7f

Please sign in to comment.