Skip to content

Commit

Permalink
Fix TasksIT#testGetTaskWaitForCompletionWithoutStoringResult (elastic…
Browse files Browse the repository at this point in the history
…#108094)

It seems that the failure (the missed index) has always existed in the test scenario and it's supposed to be handled by TransportGetTaskAction.java. We catch IndexNotFoundException here and convert it to ResourceNotFoundException. Then we catch ResourceNotFoundException here and return a snapshot of a task as a response.

In the stack trace, getFinishedTaskFromIndex was called from getRunningTaskFromNode, not from waitedForCompletion due to a race between creating a get request and unblocking request which are sent asynchronously. I've changed the waitForCompletionTestCase test method to unblock the task only after the request started waiting for the task completion by registering a removal listener. By doing so, we make sure we test the "wait for completion" branch when task is running.

The part about the missed index seems to irrelevant, since waitedForCompletion is able to suppress the error and return a snapshot of running task which is not possible if getFinishedTaskFromIndex gets called directly from getRunningTaskFromNode.

Resolves elastic#107823
  • Loading branch information
arteam authored May 29, 2024
1 parent 58cb500 commit e622101
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
Expand Down Expand Up @@ -599,24 +600,32 @@ private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId,
TEST_TASK_ACTION.name() + "[n]",
() -> client().execute(TEST_TASK_ACTION, request)
);
ActionFuture<T> waitResponseFuture;
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
TaskId taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

var taskManager = (MockTaskManager) internalCluster().getInstance(
TransportService.class,
clusterService().state().getNodes().resolveNode(taskId.getNodeId()).getName()
).getTaskManager();
var listener = new MockTaskManagerListener() {
@Override
public void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {
// Unblock the request only after it started waiting for task completion
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest());
}
};
taskManager.addListener(listener);
try {
var tasks = clusterAdmin().prepareListTasks().setActions(TEST_TASK_ACTION.name()).get().getTasks();
assertThat(tasks, hasSize(1));
var taskId = tasks.get(0).taskId();
clusterAdmin().prepareGetTask(taskId).get();

// Spin up a request to wait for the test task to finish
waitResponseFuture = wait.apply(taskId);
// The task will be unblocked as soon as the request started waiting for task completion
T waitResponse = wait.apply(taskId).get();
validator.accept(waitResponse);
} finally {
// Unblock the request so the wait for completion request can finish
client().execute(UNBLOCK_TASK_ACTION, new TestTaskPlugin.UnblockTestTasksRequest()).get();
taskManager.removeListener(listener);
}

// Now that the task is unblocked the list response will come back
T waitResponse = waitResponseFuture.get();
validator.accept(waitResponse);

TestTaskPlugin.NodesResponse response = future.get();
assertEquals(emptyList(), response.failures());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ void getRunningTaskFromNode(Task thisTask, GetTaskRequest request, ActionListene
} else {
if (request.getWaitForCompletion()) {
final ListenableActionFuture<Void> future = new ListenableActionFuture<>();
RemovedTaskListener removedTaskListener = task -> {
if (task.equals(runningTask)) {
future.onResponse(null);
RemovedTaskListener removedTaskListener = new RemovedTaskListener() {
@Override
public void onRemoved(Task task) {
if (task.equals(runningTask)) {
future.onResponse(null);
}
}

@Override
public String toString() {
return "Waiting for task completion " + runningTask;
}
};
taskManager.registerRemovedTaskListener(removedTaskListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
Expand Down Expand Up @@ -84,4 +85,16 @@ public void addListener(MockTaskManagerListener listener) {
public void removeListener(MockTaskManagerListener listener) {
listeners.remove(listener);
}

@Override
public void registerRemovedTaskListener(RemovedTaskListener removedTaskListener) {
super.registerRemovedTaskListener(removedTaskListener);
for (MockTaskManagerListener listener : listeners) {
try {
listener.onRemovedTaskListenerRegistered(removedTaskListener);
} catch (Exception e) {
logger.warn("failed to notify task manager listener about a registered removed task listener", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.test.tasks;

import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;

/**
Expand All @@ -17,4 +18,6 @@ public interface MockTaskManagerListener {
default void onTaskRegistered(Task task) {};

default void onTaskUnregistered(Task task) {};

default void onRemovedTaskListenerRegistered(RemovedTaskListener removedTaskListener) {};
}

0 comments on commit e622101

Please sign in to comment.