diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index e48b6c972cb04..4d5c4a34f5751 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -103,12 +103,16 @@ public long getAllocationId() { } /** - * Waits for this persistent task to have the desired state. + * Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly. + * + * @param predicate the persistent task predicate to evaluate + * @param timeout a timeout for waiting + * @param listener the callback listener */ - public void waitForPersistentTaskStatus(Predicate> predicate, - @Nullable TimeValue timeout, - PersistentTasksService.WaitForPersistentTaskStatusListener listener) { - persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener); + public void waitForPersistentTask(final Predicate> predicate, + final @Nullable TimeValue timeout, + final PersistentTasksService.WaitForPersistentTaskListener listener) { + persistentTasksService.waitForPersistentTaskCondition(persistentTaskId, predicate, timeout, listener); } final boolean isCompleted() { @@ -143,7 +147,7 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure) { this.failure = failure; if (prevState == State.STARTED) { logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId()); - persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new + persistentTasksService.sendCompletionRequest(getPersistentTaskId(), getAllocationId(), failure, new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index a1902df36c4c9..724e10c2c9030 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -196,7 +196,8 @@ private void cancelTask(Long allocationId) { AllocatedPersistentTask task = runningTasks.remove(allocationId); if (task.markAsCancelled()) { // Cancel the local task using the task manager - persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener() { + String reason = "task has been removed, cancelling locally"; + persistentTasksService.sendCancelRequest(task.getId(), reason, new ActionListener() { @Override public void onResponse(CancelTasksResponse cancelTasksResponse) { logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(), diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index 2b656dd219cf9..482491fc3f7e9 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -22,14 +22,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; @@ -37,20 +35,24 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.node.NodeClosedException; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import java.util.function.BiConsumer; import java.util.function.Predicate; import java.util.function.Supplier; /** - * This service is used by persistent actions to propagate changes in the action state and notify about completion + * This service is used by persistent tasks and allocated persistent tasks to communicate changes + * to the master node so that the master can update the cluster state and can track of the states + * of the persistent tasks. */ public class PersistentTasksService extends AbstractComponent { + private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; + private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks"; + private final Client client; private final ClusterService clusterService; private final ThreadPool threadPool; @@ -63,92 +65,115 @@ public PersistentTasksService(Settings settings, ClusterService clusterService, } /** - * Creates the specified persistent task and attempts to assign it to a node. + * Notifies the master node to create new persistent task and to assign it to a node. */ - @SuppressWarnings("unchecked") - public void startPersistentTask(String taskId, String taskName, @Nullable Params params, - ActionListener> listener) { - StartPersistentTaskAction.Request createPersistentActionRequest = - new StartPersistentTaskAction.Request(taskId, taskName, params); - try { - executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, - ActionListener.wrap(o -> listener.onResponse((PersistentTask) o.getTask()), listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); - } + public void sendStartRequest(final String taskId, + final String taskName, + final @Nullable Params taskParams, + final ActionListener> listener) { + @SuppressWarnings("unchecked") + final ActionListener> wrappedListener = + ActionListener.wrap(t -> listener.onResponse((PersistentTask) t), listener::onFailure); + StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams); + execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener); } /** - * Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure + * Notifies the master node about the completion of a persistent task. + *

+ * When {@code failure} is {@code null}, the persistent task is considered as successfully completed. */ - public void sendCompletionNotification(String taskId, long allocationId, Exception failure, - ActionListener> listener) { - CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure); - try { - executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest, - ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); - } + public void sendCompletionRequest(final String taskId, + final long taskAllocationId, + final @Nullable Exception taskFailure, + final ActionListener> listener) { + CompletionPersistentTaskAction.Request request = new CompletionPersistentTaskAction.Request(taskId, taskAllocationId, taskFailure); + execute(request, CompletionPersistentTaskAction.INSTANCE, listener); } /** - * Cancels a locally running task using the task manager + * Cancels a locally running task using the Task Manager API */ - void sendTaskManagerCancellation(long taskId, ActionListener listener) { - DiscoveryNode localNode = clusterService.localNode(); - CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); - cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId)); - cancelTasksRequest.setReason("persistent action was removed"); + void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { + CancelTasksRequest request = new CancelTasksRequest(); + request.setTaskId(new TaskId(clusterService.localNode().getId(), taskId)); + request.setReason(reason); try { - executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener, - client.admin().cluster()::cancelTasks); + final ThreadContext threadContext = client.threadPool().getThreadContext(); + final Supplier supplier = threadContext.newRestorableContext(false); + + try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) { + client.admin().cluster().cancelTasks(request, new ContextPreservingActionListener<>(supplier, listener)); + } } catch (Exception e) { listener.onFailure(e); } } /** - * Updates status of the persistent task. + * Notifies the master node that the state of a persistent task has changed. *

* Persistent task implementers shouldn't call this method directly and use * {@link AllocatedPersistentTask#updatePersistentStatus} instead */ - void updateStatus(String taskId, long allocationId, Task.Status status, ActionListener> listener) { - UpdatePersistentTaskStatusAction.Request updateStatusRequest = - new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status); - try { - executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, - ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); - } + void updateStatus(final String taskId, + final long taskAllocationID, + final Task.Status status, + final ActionListener> listener) { + UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status); + execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener); } /** - * Cancels if needed and removes a persistent task + * Notifies the master node to remove a persistent task from the cluster state */ - public void cancelPersistentTask(String taskId, ActionListener> listener) { - RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId); - try { - executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest, - ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure)); - } catch (Exception e) { - listener.onFailure(e); - } + public void sendRemoveRequest(final String taskId, final ActionListener> listener) { + RemovePersistentTaskAction.Request request = new RemovePersistentTaskAction.Request(taskId); + execute(request, RemovePersistentTaskAction.INSTANCE, listener); } /** - * Checks if the persistent task with giving id (taskId) has the desired state and if it doesn't - * waits of it. + * Executes an asynchronous persistent task action using the client. + *

+ * The origin is set in the context and the listener is wrapped to ensure the proper context is restored */ - public void waitForPersistentTaskStatus(String taskId, Predicate> predicate, @Nullable TimeValue timeout, - WaitForPersistentTaskStatusListener listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); - if (predicate.test(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId))) { - listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(stateObserver.setAndGetObservedState(), taskId)); + private > + void execute(final Req request, final Action action, final ActionListener> listener) { + try { + final ThreadContext threadContext = client.threadPool().getThreadContext(); + final Supplier supplier = threadContext.newRestorableContext(false); + + try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, PERSISTENT_TASK_ORIGIN)) { + client.execute(action, request, + new ContextPreservingActionListener<>(supplier, + ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure))); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Waits for a given persistent task to comply with a given predicate, then call back the listener accordingly. + * + * @param taskId the persistent task id + * @param predicate the persistent task predicate to evaluate + * @param timeout a timeout for waiting + * @param listener the callback listener + */ + public void waitForPersistentTaskCondition(final String taskId, + final Predicate> predicate, + final @Nullable TimeValue timeout, + final WaitForPersistentTaskListener listener) { + final Predicate clusterStatePredicate = clusterState -> + predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)); + + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); + final ClusterState clusterState = observer.setAndGetObservedState(); + if (clusterStatePredicate.test(clusterState)) { + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId)); } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(state, taskId)); @@ -163,18 +188,28 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { listener.onTimeout(timeout); } - }, clusterState -> predicate.test(PersistentTasksCustomMetaData.getTaskWithId(clusterState, taskId))); + }, clusterStatePredicate); } } - public void waitForPersistentTasksStatus(Predicate predicate, - @Nullable TimeValue timeout, ActionListener listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, - logger, threadPool.getThreadContext()); - if (predicate.test(stateObserver.setAndGetObservedState().metaData().custom(PersistentTasksCustomMetaData.TYPE))) { + /** + * Waits for persistent tasks to comply with a given predicate, then call back the listener accordingly. + * + * @param predicate the predicate to evaluate + * @param timeout a timeout for waiting + * @param listener the callback listener + */ + public void waitForPersistentTasksCondition(final Predicate predicate, + final @Nullable TimeValue timeout, + final ActionListener listener) { + final Predicate clusterStatePredicate = clusterState -> + predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)); + + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); + if (clusterStatePredicate.test(observer.setAndGetObservedState())) { listener.onResponse(true); } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { listener.onResponse(true); @@ -187,45 +222,15 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - listener.onFailure(new IllegalStateException("timed out after " + timeout)); + listener.onFailure(new IllegalStateException("Timed out when waiting for persistent tasks after " + timeout)); } - }, clusterState -> predicate.test(clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE)), timeout); + }, clusterStatePredicate, timeout); } } - public interface WaitForPersistentTaskStatusListener - extends ActionListener> { + public interface WaitForPersistentTaskListener

extends ActionListener> { default void onTimeout(TimeValue timeout) { - onFailure(new IllegalStateException("timed out after " + timeout)); - } - } - - private static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin"; - private static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks"; - - /** - * Executes a consumer after setting the origin and wrapping the listener so that the proper context is restored - */ - public static void executeAsyncWithOrigin( - ThreadContext threadContext, String origin, Request request, ActionListener listener, - BiConsumer> consumer) { - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) { - consumer.accept(request, new ContextPreservingActionListener<>(supplier, listener)); - } - } - /** - * Executes an asynchronous action using the provided client. The origin is set in the context and the listener - * is wrapped to ensure the proper context is restored - */ - public static > void executeAsyncWithOrigin( - Client client, String origin, Action action, Request request, - ActionListener listener) { - final ThreadContext threadContext = client.threadPool().getThreadContext(); - final Supplier supplier = threadContext.newRestorableContext(false); - try (ThreadContext.StoredContext ignore = stashWithOrigin(threadContext, origin)) { - client.execute(action, request, new ContextPreservingActionListener<>(supplier, listener)); + onFailure(new IllegalStateException("Timed out when waiting for persistent task after " + timeout)); } } @@ -234,5 +239,4 @@ public static ThreadContext.StoredContext stashWithOrigin(ThreadContext threadCo threadContext.putTransient(ACTION_ORIGIN_TRANSIENT_NAME, origin); return storedContext; } - } diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java index 0dddaaa783906..b67b7678332b7 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorFullRestartIT.java @@ -65,8 +65,7 @@ public void testFullClusterRestart() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); futures.add(future); taskIds[i] = UUIDs.base64UUID(); - service.startPersistentTask(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), - future); + service.sendStartRequest(taskIds[i], TestPersistentTasksExecutor.NAME, randomBoolean() ? null : new TestParams("Blah"), future); } for (int i = 0; i < numberOfTasks; i++) { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 91ccd8a37f06b..8f37a2412ef5a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -30,7 +30,7 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; +import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -69,15 +69,15 @@ public void cleanup() throws Exception { assertNoRunningTasks(); } - public static class WaitForPersistentTaskStatusFuture + public static class WaitForPersistentTaskFuture extends PlainActionFuture> - implements WaitForPersistentTaskStatusListener { + implements WaitForPersistentTaskListener { } public void testPersistentActionFailure() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); long allocationId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start @@ -108,7 +108,7 @@ public void testPersistentActionCompletion() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); String taskId = UUIDs.base64UUID(); - persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); + persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); long allocationId = future.get().getAllocationId(); assertBusy(() -> { // Wait for the task to start @@ -127,7 +127,7 @@ public void testPersistentActionCompletion() throws Exception { logger.info("Simulating errant completion notification"); //try sending completion request with incorrect allocation id PlainActionFuture> failedCompletionNotificationFuture = new PlainActionFuture<>(); - persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture); + persistentTasksService.sendCompletionRequest(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture); assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class); // Make sure that the task is still running assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]") @@ -142,7 +142,7 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { PlainActionFuture> future = new PlainActionFuture<>(); TestParams testParams = new TestParams("Blah"); testParams.setExecutorNodeAttr("test"); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, testParams, future); String taskId = future.get().getId(); Settings nodeSettings = Settings.builder().put(nodeSettings(0)).put("node.attr.test_attr", "test").build(); @@ -169,14 +169,14 @@ public void testPersistentActionWithNoAvailableNode() throws Exception { // Remove the persistent task PlainActionFuture> removeFuture = new PlainActionFuture<>(); - persistentTasksService.cancelPersistentTask(taskId, removeFuture); + persistentTasksService.sendRemoveRequest(taskId, removeFuture); assertEquals(removeFuture.get().getId(), taskId); } public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); + persistentTasksService.sendStartRequest(UUIDs.base64UUID(), TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); String taskId = future.get().getId(); assertBusy(() -> { @@ -200,16 +200,16 @@ public void testPersistentActionStatusUpdate() throws Exception { .get().getTasks().size(), equalTo(1)); int finalI = i; - WaitForPersistentTaskStatusFuture future1 = new WaitForPersistentTaskStatusFuture<>(); - persistentTasksService.waitForPersistentTaskStatus(taskId, + WaitForPersistentTaskFuture future1 = new WaitForPersistentTaskFuture<>(); + persistentTasksService.waitForPersistentTaskCondition(taskId, task -> task != null && task.getStatus() != null && task.getStatus().toString() != null && task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), TimeValue.timeValueSeconds(10), future1); assertThat(future1.get().getId(), equalTo(taskId)); } - WaitForPersistentTaskStatusFuture future1 = new WaitForPersistentTaskStatusFuture<>(); - persistentTasksService.waitForPersistentTaskStatus(taskId, + WaitForPersistentTaskFuture future1 = new WaitForPersistentTaskFuture<>(); + persistentTasksService.waitForPersistentTaskCondition(taskId, task -> false, TimeValue.timeValueMillis(10), future1); assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); @@ -220,8 +220,8 @@ public void testPersistentActionStatusUpdate() throws Exception { " and allocation id -2 doesn't exist"); // Wait for the task to disappear - WaitForPersistentTaskStatusFuture future2 = new WaitForPersistentTaskStatusFuture<>(); - persistentTasksService.waitForPersistentTaskStatus(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); + WaitForPersistentTaskFuture future2 = new WaitForPersistentTaskFuture<>(); + persistentTasksService.waitForPersistentTaskCondition(taskId, Objects::isNull, TimeValue.timeValueSeconds(10), future2); logger.info("Completing the running task"); // Complete the running task and make sure it finishes properly @@ -235,11 +235,11 @@ public void testCreatePersistentTaskWithDuplicateId() throws Exception { PersistentTasksService persistentTasksService = internalCluster().getInstance(PersistentTasksService.class); PlainActionFuture> future = new PlainActionFuture<>(); String taskId = UUIDs.base64UUID(); - persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); + persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future); future.get(); PlainActionFuture> future2 = new PlainActionFuture<>(); - persistentTasksService.startPersistentTask(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2); + persistentTasksService.sendStartRequest(taskId, TestPersistentTasksExecutor.NAME, new TestParams("Blah"), future2); assertThrows(future2, ResourceAlreadyExistsException.class); assertBusy(() -> { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 589fe5d61a981..a4a68f2bfd0b0 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -235,14 +235,14 @@ public void testTaskCancellation() { AtomicReference> capturedListener = new AtomicReference<>(); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { @Override - public void sendTaskManagerCancellation(long taskId, ActionListener listener) { + void sendCancelRequest(final long taskId, final String reason, final ActionListener listener) { capturedTaskId.set(taskId); capturedListener.set(listener); } @Override - public void sendCompletionNotification(String taskId, long allocationId, Exception failure, - ActionListener> listener) { + public void sendCompletionRequest(final String taskId, final long taskAllocationId, + final Exception taskFailure, final ActionListener> listener) { fail("Shouldn't be called during Cluster State cancellation"); } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index 15d12fb1ce932..cf1cc89b3a18a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -71,7 +71,7 @@ public void testEnableAssignmentAfterRestart() throws Exception { final CountDownLatch latch = new CountDownLatch(numberOfTasks); for (int i = 0; i < numberOfTasks; i++) { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); - service.startPersistentTask("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), + service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, randomTaskParams(), new ActionListener>() { @Override public void onResponse(PersistentTask task) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index fa649e541963d..36bcfe92f0075 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -315,7 +315,7 @@ private void forceCloseJob(ClusterState currentState, CloseJobAction.Request req PersistentTasksCustomMetaData.PersistentTask jobTask = MlMetadata.getJobTask(jobId, tasks); if (jobTask != null) { auditor.info(jobId, Messages.JOB_AUDIT_FORCE_CLOSING); - persistentTasksService.cancelPersistentTask(jobTask.getId(), + persistentTasksService.sendRemoveRequest(jobTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { @@ -400,7 +400,7 @@ public boolean hasJobsToWaitFor() { // so wait for that to happen here. void waitForJobClosed(CloseJobAction.Request request, WaitForCloseRequest waitForCloseRequest, CloseJobAction.Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { + persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { for (String persistentTaskId : waitForCloseRequest.persistentTaskIds) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { return false; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java index 8c9eabe6de19a..be7ac84284d74 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteDatafeedAction.java @@ -90,7 +90,7 @@ private void removeDatafeedTask(DeleteDatafeedAction.Request request, ClusterSta if (datafeedTask == null) { listener.onResponse(true); } else { - persistentTasksService.cancelPersistentTask(datafeedTask.getId(), + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 90821b302fc5c..81f4a90f575af 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -182,7 +182,7 @@ private void removePersistentTask(String jobId, ClusterState currentState, if (jobTask == null) { listener.onResponse(null); } else { - persistentTasksService.cancelPersistentTask(jobTask.getId(), + persistentTasksService.sendRemoveRequest(jobTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 5e829c72756a7..f783dfbddc35a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -465,7 +465,7 @@ public void onFailure(Exception e) { // Step 4. Start job task ActionListener establishedMemoryUpdateListener = ActionListener.wrap( - response -> persistentTasksService.startPersistentTask(MlMetadata.jobTaskId(jobParams.getJobId()), + response -> persistentTasksService.sendStartRequest(MlMetadata.jobTaskId(jobParams.getJobId()), OpenJobAction.TASK_NAME, jobParams, finalListener), listener::onFailure ); @@ -518,8 +518,8 @@ public void onFailure(Exception e) { private void waitForJobStarted(String taskId, OpenJobAction.JobParams jobParams, ActionListener listener) { JobPredicate predicate = new JobPredicate(); - persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, jobParams.getTimeout(), - new PersistentTasksService.WaitForPersistentTaskStatusListener() { + persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, jobParams.getTimeout(), + new PersistentTasksService.WaitForPersistentTaskListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { if (predicate.exception != null) { @@ -550,7 +550,7 @@ public void onTimeout(TimeValue timeout) { private void cancelJobStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, ActionListener listener) { - persistentTasksService.cancelPersistentTask(persistentTask.getId(), + persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 71afa656a4a69..bed83ed82c1c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -137,7 +137,7 @@ public void onFailure(Exception e) { Job job = mlMetadata.getJobs().get(datafeed.getJobId()); DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap( dataExtractorFactory -> - persistentTasksService.startPersistentTask(MLMetadataField.datafeedTaskId(params.getDatafeedId()), + persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()), StartDatafeedAction.TASK_NAME, params, finalListener) , listener::onFailure)); } else { @@ -156,8 +156,8 @@ protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedParams params, ActionListener listener) { DatafeedPredicate predicate = new DatafeedPredicate(); - persistentTasksService.waitForPersistentTaskStatus(taskId, predicate, params.getTimeout(), - new PersistentTasksService.WaitForPersistentTaskStatusListener() { + persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(), + new PersistentTasksService.WaitForPersistentTaskListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { if (predicate.exception != null) { @@ -184,7 +184,7 @@ public void onTimeout(TimeValue timeout) { private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask persistentTask, Exception exception, ActionListener listener) { - persistentTasksService.cancelPersistentTask(persistentTask.getId(), + persistentTasksService.sendRemoveRequest(persistentTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index b5f16ff191fe2..4b68f74eb1702 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -190,7 +190,7 @@ private void forceStopDatafeed(final StopDatafeedAction.Request request, final A for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlMetadata.getDatafeedTask(datafeedId, tasks); if (datafeedTask != null) { - persistentTasksService.cancelPersistentTask(datafeedTask.getId(), + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { @@ -275,7 +275,7 @@ private void sendResponseOrFailure(String datafeedId, ActionListener datafeedPersistentTaskIds, StopDatafeedAction.Request request, StopDatafeedAction.Response response, ActionListener listener) { - persistentTasksService.waitForPersistentTasksStatus(persistentTasksCustomMetaData -> { + persistentTasksService.waitForPersistentTasksCondition(persistentTasksCustomMetaData -> { for (String persistentTaskId: datafeedPersistentTaskIds) { if (persistentTasksCustomMetaData.getTask(persistentTaskId) != null) { return false; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index e27156b512613..69acbad20fb2d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -49,7 +49,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; -import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskStatusListener; +import static org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; public class DatafeedManager extends AbstractComponent { @@ -391,8 +391,8 @@ private void closeJob() { return; } - task.waitForPersistentTaskStatus(Objects::isNull, TimeValue.timeValueSeconds(20), - new WaitForPersistentTaskStatusListener() { + task.waitForPersistentTask(Objects::isNull, TimeValue.timeValueSeconds(20), + new WaitForPersistentTaskListener() { @Override public void onResponse(PersistentTask persistentTask) { CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(getJobId()); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index 5a47e20456507..03df531e73771 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -59,14 +59,14 @@ protected void masterOperation(DeleteRollupJobAction.Request request, ClusterSta TimeValue timeout = new TimeValue(60, TimeUnit.SECONDS); // TODO make this a config option // Step 1. Cancel the persistent task - persistentTasksService.cancelPersistentTask(jobId, new ActionListener>() { + persistentTasksService.sendRemoveRequest(jobId, new ActionListener>() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask persistentTask) { logger.debug("Request to cancel Task for Rollup job [" + jobId + "] successful."); // Step 2. Wait for the task to finish cancellation internally - persistentTasksService.waitForPersistentTaskStatus(jobId, Objects::isNull, timeout, - new PersistentTasksService.WaitForPersistentTaskStatusListener() { + persistentTasksService.waitForPersistentTaskCondition(jobId, Objects::isNull, timeout, + new PersistentTasksService.WaitForPersistentTaskListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { logger.debug("Task for Rollup job [" + jobId + "] successfully canceled."); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 61b3334f0c166..0e674ba000bd0 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -205,7 +205,7 @@ static void updateMapping(RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService) { - persistentTasksService.startPersistentTask(job.getConfig().getId(), RollupField.TASK_NAME, job, + persistentTasksService.sendStartRequest(job.getConfig().getId(), RollupField.TASK_NAME, job, ActionListener.wrap( rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), e -> { @@ -220,8 +220,8 @@ static void startPersistentTask(RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService) { - persistentTasksService.waitForPersistentTaskStatus(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(), - new PersistentTasksService.WaitForPersistentTaskStatusListener() { + persistentTasksService.waitForPersistentTaskCondition(job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(), + new PersistentTasksService.WaitForPersistentTaskListener() { @Override public void onResponse(PersistentTasksCustomMetaData.PersistentTask task) { listener.onResponse(new PutRollupJobAction.Response(true)); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 5bc8ce5c2b7bb..64cf9d2e3fe21 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -300,11 +300,11 @@ public void testTaskAlreadyExists() { doAnswer(invocation -> { requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex())); return null; - }).when(tasksService).startPersistentTask(eq(job.getConfig().getId()), + }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); + verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); } @SuppressWarnings("unchecked") @@ -326,18 +326,18 @@ public void testStartTask() { mock(PersistentTasksCustomMetaData.Assignment.class)); requestCaptor.getValue().onResponse(response); return null; - }).when(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); + }).when(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), requestCaptor.capture()); - ArgumentCaptor requestCaptor2 - = ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskStatusListener.class); + ArgumentCaptor requestCaptor2 + = ArgumentCaptor.forClass(PersistentTasksService.WaitForPersistentTaskListener.class); doAnswer(invocation -> { // Bail here with an error, further testing will happen through tests of #startPersistentTask requestCaptor2.getValue().onFailure(new RuntimeException("Ending")); return null; - }).when(tasksService).waitForPersistentTaskStatus(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); + }).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).startPersistentTask(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); - verify(tasksService).waitForPersistentTaskStatus(eq(job.getConfig().getId()), any(), any(), any()); + verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), any()); + verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any()); } }