Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult #108094

Merged
merged 17 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -599,22 +600,27 @@ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId,
TEST_TASK_ACTION.name() + "[n]",
() -> client().execute(TEST_TASK_ACTION, request)
);
ActionFuture<T> waitResponseFuture;
try {
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
var taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
} finally {
// Unblock the request so the wait for completion request can finish
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get();
}

// Now that the task is unblocked the list response will come back
T waitResponse = waitResponseFuture.get();
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
TaskId taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

var taskManager = (MockTaskManager) internalCluster().getInstance(
TransportService.class,
clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName()
).getTaskManager();
taskManager.addListener(new MockTaskManagerListener() {
@Override
public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {
// Unblock the request only after it started waiting for task completion
if (removedTaskListener.toString().startsWith("Completing running task Task{id=" + taskId.getId())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit strange, I think it works without it too, since there should be no other wait for completions going on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@henningandersen There seems to be a bug in TestTaskPlugin#TransportTestTaskAction. It checks whether a task is blocked by running waitUntil for 10 seconds, but doesn't check whether waitUntil finished successfully.

client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest());
}
}
});
// Spin up a request to wait for the test task to finish
// The task will be unblocked as soon as the request started waiting for task completion
T waitResponse = wait.apply(taskId).get();
validator.accept(waitResponse);

TestTaskPlugin.NodesResponse response = future.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
} else {
if (request.getWaitForCompletion()) {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
RemovedTaskListener removedTaskListener = new RemovedTaskListener() {
@Override
public void onRemoved(Task task) {
if (task.equals(runningTask)) {
future.onResponse(null);
}
}

@Override
public String toString() {
return "Completing running task " + runningTask;
arteam marked this conversation as resolved.
Show resolved Hide resolved
}
};
taskManager.registerRemovedTaskListener(removedTaskListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) {
public void removeListener(MockTaskManagerListener listener) {
listeners.remove(listener);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
super.registerRemovedTaskListener(removedTaskListener);
for (MockTaskManagerListener listener : listeners) {
try {
listener.onRemovedTaskListenerRegistered(removedTaskListener);
} catch (Exception e) {
logger.warn("failed to notify task manager listener about a registered removed task listener", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.test.tasks;

import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;

/**
Expand All @@ -17,4 +18,6 @@ public interface MockTaskManagerListener {
default void onTaskRegistered(Task task) {};

default void onTaskUnregistered(Task task) {};

default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {};
}
Loading