From 6d711fa50d14e8316ac6f0a18daa03c67338d433 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 15 Jun 2018 09:26:47 +0200 Subject: [PATCH] Uncouple persistent task state and status (#31031) This pull request removes the relationship between the state of persistent task (as stored in the cluster state) and the status of the task (as reported by the Task APIs and used in various places) that have been confusing for some time (#29608). In order to do that, a new PersistentTaskState interface is added. This interface represents the persisted state of a persistent task. The methods used to update the state of persistent tasks are renamed: updatePersistentStatus() becomes updatePersistentTaskState() and now takes a PersistentTaskState as a parameter. The Task.Status type as been changed to PersistentTaskState in all places were it make sense (in persistent task customs in cluster state and all other methods that deal with the state of an allocated persistent task). --- .../persistent/AllocatedPersistentTask.java | 8 +- .../NodePersistentTasksExecutor.java | 16 ++- .../persistent/PersistentTaskState.java | 29 +++++ .../PersistentTasksClusterService.java | 32 ++--- .../PersistentTasksCustomMetaData.java | 87 ++++++------- .../persistent/PersistentTasksExecutor.java | 3 +- .../PersistentTasksNodeService.java | 10 +- .../persistent/PersistentTasksService.java | 14 +-- .../UpdatePersistentTaskStatusAction.java | 33 +++-- .../PersistentTasksClusterServiceTests.java | 3 +- .../PersistentTasksCustomMetaDataTests.java | 22 ++-- .../PersistentTasksDecidersTestCase.java | 3 +- .../persistent/PersistentTasksExecutorIT.java | 12 +- .../PersistentTasksNodeServiceTests.java | 26 ++-- .../persistent/TestPersistentTasksPlugin.java | 41 +++---- .../UpdatePersistentTaskRequestTests.java | 7 +- .../xpack/core/XPackClientPlugin.java | 26 ++-- .../xpack/core/ml/MlMetadata.java | 12 +- .../xpack/core/ml/datafeed/DatafeedState.java | 4 +- .../{JobTaskStatus.java => JobTaskState.java} | 18 +-- .../core/rollup/job/RollupJobStatus.java | 5 +- .../ml/action/TransportCloseJobAction.java | 6 +- .../ml/action/TransportOpenJobAction.java | 16 +-- .../action/TransportStartDatafeedAction.java | 9 +- .../action/TransportStopDatafeedAction.java | 8 +- .../xpack/ml/datafeed/DatafeedManager.java | 2 +- .../ml/datafeed/DatafeedNodeSelector.java | 14 +-- .../autodetect/AutodetectProcessManager.java | 10 +- .../xpack/ml/MlMetadataTests.java | 4 +- .../action/TransportCloseJobActionTests.java | 2 +- .../action/TransportOpenJobActionTests.java | 6 +- .../TransportStopDatafeedActionTests.java | 4 +- .../ml/datafeed/DatafeedManagerTests.java | 4 +- .../datafeed/DatafeedNodeSelectorTests.java | 8 +- .../integration/BasicDistributedJobsIT.java | 22 ++-- .../xpack/ml/integration/TooManyJobsIT.java | 4 +- ...tatusTests.java => JobTaskStateTests.java} | 16 +-- .../AutodetectProcessManagerTests.java | 4 +- .../xpack/rollup/job/RollupJobTask.java | 60 ++++----- .../elasticsearch/xpack/rollup/RollupIT.java | 65 ++++------ .../xpack/rollup/job/RollupJobTaskTests.java | 114 ++++++++++-------- .../MlNativeAutodetectIntegTestCase.java | 8 +- 42 files changed, 405 insertions(+), 392 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/{JobTaskStatus.java => JobTaskState.java} (86%) rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/{JobTaskStatusTests.java => JobTaskStateTests.java} (53%) diff --git a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java index d4d299b7e4af1..54dcffab6e366 100644 --- a/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java +++ b/server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.tasks.CancellableTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; @@ -77,8 +76,9 @@ public Status getStatus() { *

* This doesn't affect the status of this allocated task. */ - public void updatePersistentStatus(Task.Status status, ActionListener> listener) { - persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener); + public void updatePersistentTaskState(final PersistentTaskState state, + final ActionListener> listener) { + persistentTasksService.sendUpdateStateRequest(persistentTaskId, allocationId, state, listener); } public String getPersistentTaskId() { @@ -116,7 +116,7 @@ public void waitForPersistentTask(final Predicate void executeTask(Params params, - @Nullable Task.Status status, - AllocatedPersistentTask task, - PersistentTasksExecutor executor) { + public void executeTask(final Params params, + final @Nullable PersistentTaskState state, + final AllocatedPersistentTask task, + final PersistentTasksExecutor executor) { threadPool.executor(executor.getExecutor()).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { @@ -49,14 +49,12 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { try { - executor.nodeOperation(task, params, status); + executor.nodeOperation(task, params, state); } catch (Exception ex) { task.markAsFailed(ex); } } }); - } - } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java new file mode 100644 index 0000000000000..57c913f51bb88 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskState.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.persistent; + +import org.elasticsearch.common.io.stream.NamedWriteable; +import org.elasticsearch.common.xcontent.ToXContentObject; + +/** + * {@link PersistentTaskState} represents the state of the persistent tasks, as it + * is persisted in the cluster state. + */ +public interface PersistentTaskState extends ToXContentObject, NamedWriteable { +} diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 1464279a814d5..9ed0af010b530 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -35,7 +35,6 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.decider.AssignmentDecision; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; -import org.elasticsearch.tasks.Task; import java.util.Objects; @@ -178,27 +177,30 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } /** - * Update task status + * Update the state of a persistent task * - * @param id the id of a persistent task - * @param allocationId the expected allocation id of the persistent task - * @param status new status - * @param listener the listener that will be called when task is removed + * @param taskId the id of a persistent task + * @param taskAllocationId the expected allocation id of the persistent task + * @param taskState new state + * @param listener the listener that will be called when task is removed */ - public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener> listener) { - clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() { + public void updatePersistentTaskState(final String taskId, + final long taskAllocationId, + final PersistentTaskState taskState, + final ActionListener> listener) { + clusterService.submitStateUpdateTask("update task state", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState); - if (tasksInProgress.hasTask(id, allocationId)) { - return update(currentState, tasksInProgress.updateTaskStatus(id, status)); + if (tasksInProgress.hasTask(taskId, taskAllocationId)) { + return update(currentState, tasksInProgress.updateTaskState(taskId, taskState)); } else { - if (tasksInProgress.hasTask(id)) { - logger.warn("trying to update status on task {} with unexpected allocation id {}", id, allocationId); + if (tasksInProgress.hasTask(taskId)) { + logger.warn("trying to update state on task {} with unexpected allocation id {}", taskId, taskAllocationId); } else { - logger.warn("trying to update status on non-existing task {}", id); + logger.warn("trying to update state on non-existing task {}", taskId); } - throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", id, allocationId); + throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId); } } @@ -209,7 +211,7 @@ public void onFailure(String source, Exception e) { @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, id)); + listener.onResponse(PersistentTasksCustomMetaData.getTaskWithId(newState, taskId)); } }); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java index 09346704a801d..f81b7c770e56c 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java @@ -38,8 +38,6 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.Task.Status; import java.io.IOException; import java.util.Collection; @@ -61,13 +59,12 @@ * A cluster state record that contains a list of all running persistent tasks */ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable implements MetaData.Custom { - public static final String TYPE = "persistent_tasks"; + public static final String TYPE = "persistent_tasks"; private static final String API_CONTEXT = MetaData.XContentContext.API.toString(); // TODO: Implement custom Diff for tasks private final Map> tasks; - private final long lastAllocationId; public PersistentTasksCustomMetaData(long lastAllocationId, Map> tasks) { @@ -94,8 +91,8 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map, String> parser = new ObjectParser<>("named"); parser.declareObject(TaskDescriptionBuilder::setParams, (p, c) -> p.namedObject(PersistentTaskParams.class, c, null), new ParseField("params")); - parser.declareObject(TaskDescriptionBuilder::setStatus, - (p, c) -> p.namedObject(Status.class, c, null), new ParseField("status")); + parser.declareObject(TaskDescriptionBuilder::setState, + (p, c) -> p.namedObject(PersistentTaskState.class, c, null), new ParseField("state", "status")); TASK_DESCRIPTION_PARSER = (XContentParser p, Void c, String name) -> parser.parse(p, new TaskDescriptionBuilder<>(name), name); // Assignment parser @@ -115,7 +112,7 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map builder = objects.get(0); taskBuilder.setTaskName(builder.taskName); taskBuilder.setParams(builder.params); - taskBuilder.setStatus(builder.status); + taskBuilder.setState(builder.state); }, TASK_DESCRIPTION_PARSER, new ParseField("task")); PERSISTENT_TASK_PARSER.declareObject(TaskBuilder::setAssignment, ASSIGNMENT_PARSER, new ParseField("assignment")); PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationIdOnLastStatusUpdate, @@ -123,12 +120,13 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map { + private final String taskName; private Params params; - private Status status; + private PersistentTaskState state; private TaskDescriptionBuilder(String taskName) { this.taskName = taskName; @@ -139,8 +137,8 @@ private TaskDescriptionBuilder setParams(Params params) { return this; } - private TaskDescriptionBuilder setStatus(Status status) { - this.status = status; + private TaskDescriptionBuilder setState(PersistentTaskState state) { + this.state = state; return this; } } @@ -261,37 +259,34 @@ public String toString() { * A record that represents a single running persistent task */ public static class PersistentTask

implements Writeable, ToXContentObject { + private final String id; private final long allocationId; private final String taskName; private final P params; - @Nullable - private final Status status; + private final @Nullable PersistentTaskState state; private final Assignment assignment; - @Nullable - private final Long allocationIdOnLastStatusUpdate; + private final @Nullable Long allocationIdOnLastStatusUpdate; - public PersistentTask(String id, String taskName, P params, long allocationId, Assignment assignment) { - this(id, allocationId, taskName, params, null, assignment, null); + public PersistentTask(final String id, final String name, final P params, final long allocationId, final Assignment assignment) { + this(id, allocationId, name, params, null, assignment, null); } - public PersistentTask(PersistentTask

task, long allocationId, Assignment assignment) { - this(task.id, allocationId, task.taskName, task.params, task.status, - assignment, task.allocationId); + public PersistentTask(final PersistentTask

task, final long allocationId, final Assignment assignment) { + this(task.id, allocationId, task.taskName, task.params, task.state, assignment, task.allocationId); } - public PersistentTask(PersistentTask

task, Status status) { - this(task.id, task.allocationId, task.taskName, task.params, status, - task.assignment, task.allocationId); + public PersistentTask(final PersistentTask

task, final PersistentTaskState state) { + this(task.id, task.allocationId, task.taskName, task.params, state, task.assignment, task.allocationId); } - private PersistentTask(String id, long allocationId, String taskName, P params, - Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) { + private PersistentTask(final String id, final long allocationId, final String name, final P params, + final PersistentTaskState state, final Assignment assignment, final Long allocationIdOnLastStatusUpdate) { this.id = id; this.allocationId = allocationId; - this.taskName = taskName; + this.taskName = name; this.params = params; - this.status = status; + this.state = state; this.assignment = assignment; this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate; if (params != null) { @@ -300,10 +295,10 @@ private PersistentTask(String id, long allocationId, String taskName, P params, params.getWriteableName() + " task: " + taskName); } } - if (status != null) { - if (status.getWriteableName().equals(taskName) == false) { + if (state != null) { + if (state.getWriteableName().equals(taskName) == false) { throw new IllegalArgumentException("status has to have the same writeable name as task. status: " + - status.getWriteableName() + " task: " + taskName); + state.getWriteableName() + " task: " + taskName); } } } @@ -318,7 +313,7 @@ public PersistentTask(StreamInput in) throws IOException { } else { params = (P) in.readOptionalNamedWriteable(PersistentTaskParams.class); } - status = in.readOptionalNamedWriteable(Task.Status.class); + state = in.readOptionalNamedWriteable(PersistentTaskState.class); assignment = new Assignment(in.readOptionalString(), in.readString()); allocationIdOnLastStatusUpdate = in.readOptionalLong(); } @@ -333,7 +328,7 @@ public void writeTo(StreamOutput out) throws IOException { } else { out.writeOptionalNamedWriteable(params); } - out.writeOptionalNamedWriteable(status); + out.writeOptionalNamedWriteable(state); out.writeOptionalString(assignment.executorNode); out.writeString(assignment.explanation); out.writeOptionalLong(allocationIdOnLastStatusUpdate); @@ -348,15 +343,14 @@ public boolean equals(Object o) { allocationId == that.allocationId && Objects.equals(taskName, that.taskName) && Objects.equals(params, that.params) && - Objects.equals(status, that.status) && + Objects.equals(state, that.state) && Objects.equals(assignment, that.assignment) && Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate); } @Override public int hashCode() { - return Objects.hash(id, allocationId, taskName, params, status, assignment, - allocationIdOnLastStatusUpdate); + return Objects.hash(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); } @Override @@ -395,8 +389,8 @@ public boolean isAssigned() { } @Nullable - public Status getStatus() { - return status; + public PersistentTaskState getState() { + return state; } @Override @@ -411,8 +405,8 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params xPa if (params != null) { builder.field("params", params, xParams); } - if (status != null) { - builder.field("status", status, xParams); + if (state != null) { + builder.field("state", state, xParams); } } builder.endObject(); @@ -448,7 +442,7 @@ private static class TaskBuilder { private long allocationId; private String taskName; private Params params; - private Status status; + private PersistentTaskState state; private Assignment assignment = INITIAL_ASSIGNMENT; private Long allocationIdOnLastStatusUpdate; @@ -472,8 +466,8 @@ public TaskBuilder setParams(Params params) { return this; } - public TaskBuilder setStatus(Status status) { - this.status = status; + public TaskBuilder setState(PersistentTaskState state) { + this.state = state; return this; } @@ -489,8 +483,7 @@ public TaskBuilder setAllocationIdOnLastStatusUpdate(Long allocationIdOn } public PersistentTask build() { - return new PersistentTask<>(id, allocationId, taskName, params, status, - assignment, allocationIdOnLastStatusUpdate); + return new PersistentTask<>(id, allocationId, taskName, params, state, assignment, allocationIdOnLastStatusUpdate); } } @@ -608,13 +601,13 @@ public Builder reassignTask(String taskId, Assignment assignment) { } /** - * Updates the task status + * Updates the task state */ - public Builder updateTaskStatus(String taskId, Status status) { + public Builder updateTaskState(final String taskId, final PersistentTaskState taskState) { PersistentTask taskInProgress = tasks.get(taskId); if (taskInProgress != null) { changed = true; - tasks.put(taskId, new PersistentTask<>(taskInProgress, status)); + tasks.put(taskId, new PersistentTask<>(taskInProgress, taskState)); } else { throw new ResourceNotFoundException("cannot update task with id {" + taskId + "}, the task no longer exists"); } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java index de75b1ff54085..758ffbe69a04d 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java @@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import java.util.Map; @@ -118,7 +117,7 @@ protected String getDescription(PersistentTask taskInProgress) { * NOTE: The nodeOperation has to throw an exception, trigger task.markAsCompleted() or task.completeAndNotifyIfNeeded() methods to * indicate that the persistent task has finished. */ - protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable Task.Status status); + protected abstract void nodeOperation(AllocatedPersistentTask task, Params params, @Nullable PersistentTaskState state); public String getExecutor() { return executor; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java index 724e10c2c9030..91cdb400aa0d4 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java @@ -50,13 +50,13 @@ * non-transport client nodes in the cluster and monitors cluster state changes to detect started commands. */ public class PersistentTasksNodeService extends AbstractComponent implements ClusterStateListener { + private final Map runningTasks = new HashMap<>(); private final PersistentTasksService persistentTasksService; private final PersistentTasksExecutorRegistry persistentTasksExecutorRegistry; private final TaskManager taskManager; private final NodePersistentTasksExecutor nodePersistentTasksExecutor; - public PersistentTasksNodeService(Settings settings, PersistentTasksService persistentTasksService, PersistentTasksExecutorRegistry persistentTasksExecutorRegistry, @@ -172,7 +172,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, task.getPersistentTaskId(), task.getAllocationId()); try { runningTasks.put(taskInProgress.getAllocationId(), task); - nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getStatus(), task, executor); + nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), taskInProgress.getState(), task, executor); } catch (Exception e) { // Submit task failure task.markAsFailed(e); @@ -215,8 +215,8 @@ public void onFailure(Exception e) { } } - public static class Status implements Task.Status { + public static final String NAME = "persistent_executor"; private final AllocatedPersistentTask.State state; @@ -252,10 +252,6 @@ public String toString() { return Strings.toString(this); } - public AllocatedPersistentTask.State getState() { - return state; - } - @Override public boolean isFragment() { return false; diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index e3d7020f4e037..c820b244af2db 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -114,13 +113,14 @@ void sendCancelRequest(final long taskId, final String reason, final ActionListe * Notifies the master node that the state of a persistent task has changed. *

* Persistent task implementers shouldn't call this method directly and use - * {@link AllocatedPersistentTask#updatePersistentStatus} instead + * {@link AllocatedPersistentTask#updatePersistentTaskState} instead */ - void updateStatus(final String taskId, - final long taskAllocationID, - final Task.Status status, - final ActionListener> listener) { - UpdatePersistentTaskStatusAction.Request request = new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, status); + void sendUpdateStateRequest(final String taskId, + final long taskAllocationID, + final PersistentTaskState taskState, + final ActionListener> listener) { + UpdatePersistentTaskStatusAction.Request request = + new UpdatePersistentTaskStatusAction.Request(taskId, taskAllocationID, taskState); execute(request, UpdatePersistentTaskStatusAction.INSTANCE, listener); } diff --git a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java index 53bc9afd0fdf5..389a50f644a15 100644 --- a/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java +++ b/server/src/main/java/org/elasticsearch/persistent/UpdatePersistentTaskStatusAction.java @@ -35,10 +35,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import java.io.IOException; import java.util.Objects; @@ -70,16 +69,15 @@ public static class Request extends MasterNodeRequest { private String taskId; private long allocationId = -1L; - private Task.Status status; + private PersistentTaskState state; public Request() { - } - public Request(String taskId, long allocationId, Task.Status status) { + public Request(String taskId, long allocationId, PersistentTaskState state) { this.taskId = taskId; this.allocationId = allocationId; - this.status = status; + this.state = state; } public void setTaskId(String taskId) { @@ -90,8 +88,8 @@ public void setAllocationId(long allocationId) { this.allocationId = allocationId; } - public void setStatus(Task.Status status) { - this.status = status; + public void setState(PersistentTaskState state) { + this.state = state; } @Override @@ -99,7 +97,7 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); taskId = in.readString(); allocationId = in.readLong(); - status = in.readOptionalNamedWriteable(Task.Status.class); + state = in.readOptionalNamedWriteable(PersistentTaskState.class); } @Override @@ -107,7 +105,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(taskId); out.writeLong(allocationId); - out.writeOptionalNamedWriteable(status); + out.writeOptionalNamedWriteable(state); } @Override @@ -129,13 +127,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && - Objects.equals(status, request.status); + return Objects.equals(taskId, request.taskId) && allocationId == request.allocationId && Objects.equals(state, request.state); } @Override public int hashCode() { - return Objects.hash(taskId, allocationId, status); + return Objects.hash(taskId, allocationId, state); } } @@ -151,11 +148,10 @@ public final RequestBuilder setTaskId(String taskId) { return this; } - public final RequestBuilder setStatus(Task.Status status) { - request.setStatus(status); + public final RequestBuilder setState(PersistentTaskState state) { + request.setState(state); return this; } - } public static class TransportAction extends TransportMasterNodeAction { @@ -189,9 +185,10 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state) } @Override - protected final void masterOperation(final Request request, ClusterState state, + protected final void masterOperation(final Request request, + final ClusterState state, final ActionListener listener) { - persistentTasksClusterService.updatePersistentTaskStatus(request.taskId, request.allocationId, request.status, + persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state, new ActionListener>() { @Override public void onResponse(PersistentTask task) { diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 916fdee213695..f13a35613d530 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -37,7 +37,6 @@ import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.TestThreadPool; @@ -649,7 +648,7 @@ public Assignment getAssignment(P params, ClusterState clusterState) { } @Override - protected void nodeOperation(AllocatedPersistentTask task, P params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, P params, PersistentTaskState state) { throw new UnsupportedOperationException(); } })); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java index 72e74359d3016..5b1f74d6cdfa5 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksCustomMetaDataTests.java @@ -42,10 +42,9 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Builder; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; @@ -79,7 +78,7 @@ protected PersistentTasksCustomMetaData createTestInstance() { randomAssignment()); if (randomBoolean()) { // From time to time update status - tasks.updateTaskStatus(taskId, new Status(randomAlphaOfLength(10))); + tasks.updateTaskState(taskId, new State(randomAlphaOfLength(10))); } } return tasks.build(); @@ -96,7 +95,7 @@ protected NamedWriteableRegistry getNamedWriteableRegistry() { new Entry(MetaData.Custom.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::new), new Entry(NamedDiff.class, PersistentTasksCustomMetaData.TYPE, PersistentTasksCustomMetaData::readDiffFrom), new Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), - new Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) )); } @@ -118,7 +117,7 @@ protected Custom makeTestChanges(Custom testInstance) { if (builder.getCurrentTaskIds().isEmpty()) { addRandomTask(builder); } else { - builder.updateTaskStatus(pickRandomTask(builder), randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); + builder.updateTaskState(pickRandomTask(builder), randomBoolean() ? new State(randomAlphaOfLength(10)) : null); } break; case 3: @@ -155,9 +154,10 @@ private String pickRandomTask(PersistentTasksCustomMetaData.Builder testInstance @Override protected NamedXContentRegistry xContentRegistry() { return new NamedXContentRegistry(Arrays.asList( - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent) )); } @@ -186,7 +186,7 @@ public void testSerializationContext() throws Exception { // Things that should be serialized assertEquals(testTask.getTaskName(), newTask.getTaskName()); assertEquals(testTask.getId(), newTask.getId()); - assertEquals(testTask.getStatus(), newTask.getStatus()); + assertEquals(testTask.getState(), newTask.getState()); assertEquals(testTask.getParams(), newTask.getParams()); // Things that shouldn't be serialized @@ -224,10 +224,10 @@ public void testBuilder() { case 2: if (builder.hasTask(lastKnownTask)) { changed = true; - builder.updateTaskStatus(lastKnownTask, randomBoolean() ? new Status(randomAlphaOfLength(10)) : null); + builder.updateTaskState(lastKnownTask, randomBoolean() ? new State(randomAlphaOfLength(10)) : null); } else { String fLastKnownTask = lastKnownTask; - expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskStatus(fLastKnownTask, null)); + expectThrows(ResourceNotFoundException.class, () -> builder.updateTaskState(fLastKnownTask, null)); } break; case 3: diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java index 356e518198c52..655a21a5f5390 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksDecidersTestCase.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -64,7 +63,7 @@ public void setUp() throws Exception { public PersistentTasksExecutor getPersistentTaskExecutorSafe(String taskName) { return new PersistentTasksExecutor(clusterService.getSettings(), taskName, null) { @Override - protected void nodeOperation(AllocatedPersistentTask task, Params params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, Params params, PersistentTaskState state) { logger.debug("Executing task {}", task); } }; diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java index 8f37a2412ef5a..e746ff71627cd 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java @@ -31,7 +31,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService.WaitForPersistentTaskListener; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestTasksRequestBuilder; @@ -190,11 +190,11 @@ public void testPersistentActionStatusUpdate() throws Exception { PersistentTasksCustomMetaData tasksInProgress = internalCluster().clusterService().state().getMetaData() .custom(PersistentTasksCustomMetaData.TYPE); assertThat(tasksInProgress.tasks().size(), equalTo(1)); - assertThat(tasksInProgress.tasks().iterator().next().getStatus(), nullValue()); + assertThat(tasksInProgress.tasks().iterator().next().getState(), nullValue()); int numberOfUpdates = randomIntBetween(1, 10); for (int i = 0; i < numberOfUpdates; i++) { - logger.info("Updating the task status"); + logger.info("Updating the task states"); // Complete the running task and make sure it finishes properly assertThat(new TestTasksRequestBuilder(client()).setOperation("update_status").setTaskId(firstRunningTask.getTaskId()) .get().getTasks().size(), equalTo(1)); @@ -202,8 +202,8 @@ public void testPersistentActionStatusUpdate() throws Exception { int finalI = i; WaitForPersistentTaskFuture future1 = new WaitForPersistentTaskFuture<>(); persistentTasksService.waitForPersistentTaskCondition(taskId, - task -> task != null && task.getStatus() != null && task.getStatus().toString() != null && - task.getStatus().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), + task -> task != null && task.getState() != null && task.getState().toString() != null && + task.getState().toString().equals("{\"phase\":\"phase " + (finalI + 1) + "\"}"), TimeValue.timeValueSeconds(10), future1); assertThat(future1.get().getId(), equalTo(taskId)); } @@ -215,7 +215,7 @@ public void testPersistentActionStatusUpdate() throws Exception { assertThrows(future1, IllegalStateException.class, "timed out after 10ms"); PlainActionFuture> failedUpdateFuture = new PlainActionFuture<>(); - persistentTasksService.updateStatus(taskId, -2, new Status("should fail"), failedUpdateFuture); + persistentTasksService.sendUpdateStateRequest(taskId, -2, new State("should fail"), failedUpdateFuture); assertThrows(failedUpdateFuture, ResourceNotFoundException.class, "the task with id " + taskId + " and allocation id -2 doesn't exist"); diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java index 5000f73445b0c..906ecf232053d 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java @@ -210,13 +210,12 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { ClusterState state = createInitialClusterState(1, Settings.EMPTY); - Task.Status status = new TestPersistentTasksPlugin.Status("_test_phase"); + PersistentTaskState taskState = new TestPersistentTasksPlugin.State("_test_phase"); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); String taskId = UUIDs.base64UUID(); TestParams taskParams = new TestParams("other_0"); - tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, - new Assignment("this_node", "test assignment on other node")); - tasks.updateTaskStatus(taskId, status); + tasks.addTask(taskId, TestPersistentTasksExecutor.NAME, taskParams, new Assignment("this_node", "test assignment on other node")); + tasks.updateTaskState(taskId, taskState); MetaData.Builder metaData = MetaData.builder(state.metaData()); metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build()); ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build(); @@ -225,7 +224,7 @@ public void testParamsStatusAndNodeTaskAreDelegated() throws Exception { assertThat(executor.size(), equalTo(1)); assertThat(executor.get(0).params, sameInstance(taskParams)); - assertThat(executor.get(0).status, sameInstance(status)); + assertThat(executor.get(0).state, sameInstance(taskState)); assertThat(executor.get(0).task, sameInstance(nodeTask)); } @@ -331,15 +330,16 @@ private ClusterState removeTask(ClusterState state, String taskId) { } private class Execution { + private final PersistentTaskParams params; private final AllocatedPersistentTask task; - private final Task.Status status; + private final PersistentTaskState state; private final PersistentTasksExecutor holder; - Execution(PersistentTaskParams params, AllocatedPersistentTask task, Task.Status status, PersistentTasksExecutor holder) { + Execution(PersistentTaskParams params, AllocatedPersistentTask task, PersistentTaskState state, PersistentTasksExecutor holder) { this.params = params; this.task = task; - this.status = status; + this.state = state; this.holder = holder; } } @@ -352,11 +352,11 @@ private class MockExecutor extends NodePersistentTasksExecutor { } @Override - public void executeTask(Params params, - Task.Status status, - AllocatedPersistentTask task, - PersistentTasksExecutor executor) { - executions.add(new Execution(params, task, status, executor)); + public void executeTask(final Params params, + final PersistentTaskState state, + final AllocatedPersistentTask task, + final PersistentTasksExecutor executor) { + executions.add(new Execution(params, task, state, executor)); } public Execution get(int i) { diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 9799036e0ea91..cfcc5ed8c0f63 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -55,7 +55,6 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.PersistentTaskPlugin; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; @@ -100,16 +99,17 @@ public List> getPersistentTasksExecutor(ClusterServic public List getNamedWriteables() { return Arrays.asList( new NamedWriteableRegistry.Entry(PersistentTaskParams.class, TestPersistentTasksExecutor.NAME, TestParams::new), - new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) ); } @Override public List getNamedXContent() { return Arrays.asList( - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TestPersistentTasksExecutor.NAME), - TestParams::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(TestPersistentTasksExecutor.NAME), Status::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, + new ParseField(TestPersistentTasksExecutor.NAME), TestParams::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, + new ParseField(TestPersistentTasksExecutor.NAME), State::fromXContent) ); } @@ -221,22 +221,22 @@ public Optional getRequiredFeature() { } } - public static class Status implements Task.Status { + public static class State implements PersistentTaskState { private final String phase; - public static final ConstructingObjectParser STATUS_PARSER = - new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new Status((String) args[0])); + public static final ConstructingObjectParser STATE_PARSER = + new ConstructingObjectParser<>(TestPersistentTasksExecutor.NAME, args -> new State((String) args[0])); static { - STATUS_PARSER.declareString(constructorArg(), new ParseField("phase")); + STATE_PARSER.declareString(constructorArg(), new ParseField("phase")); } - public Status(String phase) { + public State(String phase) { this.phase = requireNonNull(phase, "Phase cannot be null"); } - public Status(StreamInput in) throws IOException { + public State(StreamInput in) throws IOException { phase = in.readString(); } @@ -253,11 +253,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - public static Task.Status fromXContent(XContentParser parser) throws IOException { - return STATUS_PARSER.parse(parser, null); + public static PersistentTaskState fromXContent(XContentParser parser) throws IOException { + return STATE_PARSER.parse(parser, null); } - @Override public boolean isFragment() { return false; @@ -276,10 +275,10 @@ public String toString() { // Implements equals and hashcode for testing @Override public boolean equals(Object obj) { - if (obj == null || obj.getClass() != Status.class) { + if (obj == null || obj.getClass() != State.class) { return false; } - Status other = (Status) obj; + State other = (State) obj; return phase.equals(other.phase); } @@ -289,7 +288,6 @@ public int hashCode() { } } - public static class TestPersistentTasksExecutor extends PersistentTasksExecutor { public static final String NAME = "cluster:admin/persistent/test"; @@ -317,7 +315,7 @@ public Assignment getAssignment(TestParams params, ClusterState clusterState) { } @Override - protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, TestParams params, PersistentTaskState state) { logger.info("started node operation for the task {}", task); try { TestTask testTask = (TestTask) task; @@ -340,9 +338,9 @@ protected void nodeOperation(AllocatedPersistentTask task, TestParams params, Ta } else if ("update_status".equals(testTask.getOperation())) { testTask.setOperation(null); CountDownLatch latch = new CountDownLatch(1); - Status newStatus = new Status("phase " + phase.incrementAndGet()); - logger.info("updating the task status to {}", newStatus); - task.updatePersistentStatus(newStatus, new ActionListener>() { + State newState = new State("phase " + phase.incrementAndGet()); + logger.info("updating the task state to {}", newState); + task.updatePersistentTaskState(newState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("updating was successful"); @@ -545,5 +543,4 @@ protected void taskOperation(TestTasksRequest request, TestTask task, ActionList } - } diff --git a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java index 6e20bb0009732..5ae54640f8e31 100644 --- a/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/UpdatePersistentTaskRequestTests.java @@ -20,9 +20,8 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.AbstractStreamableTestCase; -import org.elasticsearch.persistent.TestPersistentTasksPlugin.Status; +import org.elasticsearch.persistent.TestPersistentTasksPlugin.State; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestPersistentTasksExecutor; import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction.Request; @@ -32,7 +31,7 @@ public class UpdatePersistentTaskRequestTests extends AbstractStreamableTestCase @Override protected Request createTestInstance() { - return new Request(UUIDs.base64UUID(), randomLong(), new Status(randomAlphaOfLength(10))); + return new Request(UUIDs.base64UUID(), randomLong(), new State(randomAlphaOfLength(10))); } @Override @@ -43,7 +42,7 @@ protected Request createBlankInstance() { @Override protected NamedWriteableRegistry getNamedWriteableRegistry() { return new NamedWriteableRegistry(Collections.singletonList( - new NamedWriteableRegistry.Entry(Task.Status.class, TestPersistentTasksExecutor.NAME, Status::new) + new NamedWriteableRegistry.Entry(PersistentTaskState.class, TestPersistentTasksExecutor.NAME, State::new) )); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java index a96de96fd4f44..049089e62cf26 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackClientPlugin.java @@ -28,6 +28,7 @@ import org.elasticsearch.license.PostStartTrialAction; import org.elasticsearch.license.PutLicenseAction; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -89,7 +90,7 @@ import org.elasticsearch.xpack.core.ml.action.ValidateDetectorAction; import org.elasticsearch.xpack.core.ml.action.ValidateJobConfigAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.monitoring.MonitoringFeatureSetUsage; import org.elasticsearch.xpack.core.rollup.RollupFeatureSetUsage; import org.elasticsearch.xpack.core.rollup.RollupField; @@ -325,9 +326,9 @@ public List getNamedWriteables() { StartDatafeedAction.DatafeedParams::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new), - // ML - Task statuses - new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new), - new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream), + // ML - Task states + new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new), + new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream), new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.MACHINE_LEARNING, MachineLearningFeatureSetUsage::new), // monitoring @@ -350,7 +351,8 @@ public List getNamedWriteables() { // rollup new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.ROLLUP, RollupFeatureSetUsage::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, RollupJob.NAME, RollupJob::new), - new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new) + new NamedWriteableRegistry.Entry(Task.Status.class, RollupJobStatus.NAME, RollupJobStatus::new), + new NamedWriteableRegistry.Entry(PersistentTaskState.class, RollupJobStatus.NAME, RollupJobStatus::new) ); } @@ -365,9 +367,9 @@ public List getNamedXContent() { StartDatafeedAction.DatafeedParams::fromXContent), new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME), OpenJobAction.JobParams::fromXContent), - // ML - Task statuses - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(JobTaskStatus.NAME), JobTaskStatus::fromXContent), + // ML - Task states + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(JobTaskState.NAME), JobTaskState::fromXContent), // watcher new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(WatcherMetaData.TYPE), WatcherMetaData::fromXContent), @@ -375,8 +377,12 @@ public List getNamedXContent() { new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(LicensesMetaData.TYPE), LicensesMetaData::fromXContent), //rollup - new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), RollupJob::fromXContent), - new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), RollupJobStatus::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(RollupField.TASK_NAME), + RollupJob::fromXContent), + new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(RollupJobStatus.NAME), + RollupJobStatus::fromXContent), + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(RollupJobStatus.NAME), + RollupJobStatus::fromXContent) ); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 861f386a90966..5e145306f8c1f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedUpdate; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.groups.GroupOrJobLookup; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; @@ -402,9 +402,9 @@ public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, if (allowDeleteOpenJob == false) { PersistentTask jobTask = getJobTask(jobId, tasks); if (jobTask != null) { - JobTaskStatus jobTaskStatus = (JobTaskStatus) jobTask.getStatus(); + JobTaskState jobTaskState = (JobTaskState) jobTask.getState(); throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because the job is " - + ((jobTaskStatus == null) ? JobState.OPENING : jobTaskStatus.getState())); + + ((jobTaskState == null) ? JobState.OPENING : jobTaskState.getState())); } } Job.Builder jobBuilder = new Job.Builder(job); @@ -448,7 +448,7 @@ public static PersistentTask getDatafeedTask(String datafeedId, @Nullable Per public static JobState getJobState(String jobId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTask task = getJobTask(jobId, tasks); if (task != null) { - JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); + JobTaskState jobTaskState = (JobTaskState) task.getState(); if (jobTaskState == null) { return JobState.OPENING; } @@ -460,8 +460,8 @@ public static JobState getJobState(String jobId, @Nullable PersistentTasksCustom public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTask task = getDatafeedTask(datafeedId, tasks); - if (task != null && task.getStatus() != null) { - return (DatafeedState) task.getStatus(); + if (task != null && task.getState() != null) { + return (DatafeedState) task.getState(); } else { // If we haven't started a datafeed then there will be no persistent task, // which is the same as if the datafeed was't started diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java index 7343600a6ee37..d894f7b339fe5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedState.java @@ -12,7 +12,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; import java.io.IOException; @@ -20,7 +20,7 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public enum DatafeedState implements Task.Status { +public enum DatafeedState implements PersistentTaskState { STARTED, STOPPED, STARTING, STOPPING; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java similarity index 86% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java index de102798d1ca6..d9ab3357319c6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/JobTaskState.java @@ -12,25 +12,25 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.xpack.core.ml.action.OpenJobAction; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; +import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import java.io.IOException; import java.util.Objects; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; -public class JobTaskStatus implements Task.Status { +public class JobTaskState implements PersistentTaskState { public static final String NAME = OpenJobAction.TASK_NAME; private static ParseField STATE = new ParseField("state"); private static ParseField ALLOCATION_ID = new ParseField("allocation_id"); - private static final ConstructingObjectParser PARSER = + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - args -> new JobTaskStatus((JobState) args[0], (Long) args[1])); + args -> new JobTaskState((JobState) args[0], (Long) args[1])); static { PARSER.declareField(constructorArg(), p -> { @@ -42,7 +42,7 @@ public class JobTaskStatus implements Task.Status { PARSER.declareLong(constructorArg(), ALLOCATION_ID); } - public static JobTaskStatus fromXContent(XContentParser parser) { + public static JobTaskState fromXContent(XContentParser parser) { try { return PARSER.parse(parser, null); } catch (IOException e) { @@ -53,12 +53,12 @@ public static JobTaskStatus fromXContent(XContentParser parser) { private final JobState state; private final long allocationId; - public JobTaskStatus(JobState state, long allocationId) { + public JobTaskState(JobState state, long allocationId) { this.state = Objects.requireNonNull(state); this.allocationId = allocationId; } - public JobTaskStatus(StreamInput in) throws IOException { + public JobTaskState(StreamInput in) throws IOException { state = JobState.fromStream(in); allocationId = in.readLong(); } @@ -100,7 +100,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - JobTaskStatus that = (JobTaskStatus) o; + JobTaskState that = (JobTaskState) o; return state == that.state && Objects.equals(allocationId, that.allocationId); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java index 86bc95e092ca3..4cbd5a3b4559a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/job/RollupJobStatus.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.tasks.Task; import java.io.IOException; @@ -30,7 +31,7 @@ * indexer's current position. When the allocated task updates its status, * it is providing a new version of this. */ -public class RollupJobStatus implements Task.Status { +public class RollupJobStatus implements Task.Status, PersistentTaskState { public static final String NAME = "xpack/rollup/job"; private final IndexerState state; @@ -73,7 +74,7 @@ public RollupJobStatus(StreamInput in) throws IOException { currentPosition = in.readBoolean() ? new TreeMap<>(in.readMap()) : null; } - public IndexerState getState() { + public IndexerState getIndexerState() { return state; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 36bcfe92f0075..083d4ce5b1514 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -34,7 +34,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -256,8 +256,8 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen @Override protected void taskOperation(CloseJobAction.Request request, TransportOpenJobAction.JobTask jobTask, ActionListener listener) { - JobTaskStatus taskStatus = new JobTaskStatus(JobState.CLOSING, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { + JobTaskState taskState = new JobTaskState(JobState.CLOSING, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { // we need to fork because we are now on a network threadpool and closeJob method may take a while to complete: threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 8102ccdddb5df..1ddd19104cd30 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -39,12 +39,12 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -57,7 +57,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -192,7 +192,7 @@ static PersistentTasksCustomMetaData.Assignment selectLeastLoadedMlNode(String j Collection> assignedTasks = persistentTasks.findTasks( OpenJobAction.TASK_NAME, task -> node.getId().equals(task.getExecutorNode())); for (PersistentTasksCustomMetaData.PersistentTask assignedTask : assignedTasks) { - JobTaskStatus jobTaskState = (JobTaskStatus) assignedTask.getStatus(); + JobTaskState jobTaskState = (JobTaskState) assignedTask.getState(); JobState jobState; if (jobTaskState == null || // executor node didn't have the chance to set job status to OPENING // previous executor node failed and current executor node didn't have the chance to set job status to OPENING @@ -665,14 +665,14 @@ public void validate(OpenJobAction.JobParams params, ClusterState clusterState) } @Override - protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, Task.Status status) { + protected void nodeOperation(AllocatedPersistentTask task, OpenJobAction.JobParams params, PersistentTaskState state) { JobTask jobTask = (JobTask) task; jobTask.autodetectProcessManager = autodetectProcessManager; - JobTaskStatus jobStateStatus = (JobTaskStatus) status; + JobTaskState jobTaskState = (JobTaskState) state; // If the job is failed then the Persistent Task Service will // try to restart it on a node restart. Exiting here leaves the // job in the failed state and it must be force closed. - if (jobStateStatus != null && jobStateStatus.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { + if (jobTaskState != null && jobTaskState.getState().isAnyOf(JobState.FAILED, JobState.CLOSING)) { return; } @@ -756,8 +756,8 @@ private class JobPredicate implements Predicate persistentTask) { JobState jobState = JobState.CLOSED; if (persistentTask != null) { - JobTaskStatus jobStateStatus = (JobTaskStatus) persistentTask.getStatus(); - jobState = jobStateStatus == null ? JobState.OPENING : jobStateStatus.getState(); + JobTaskState jobTaskState = (JobTaskState) persistentTask.getState(); + jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState(); PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); // This logic is only appropriate when opening a job, not when reallocating following a failure, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 3d261864ab409..b13ed6d698451 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -23,8 +23,8 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -274,8 +274,9 @@ public void validate(StartDatafeedAction.DatafeedParams params, ClusterState clu } @Override - protected void nodeOperation(AllocatedPersistentTask allocatedPersistentTask, StartDatafeedAction.DatafeedParams params, - Task.Status status) { + protected void nodeOperation(final AllocatedPersistentTask allocatedPersistentTask, + final StartDatafeedAction.DatafeedParams params, + final PersistentTaskState state) { DatafeedTask datafeedTask = (DatafeedTask) allocatedPersistentTask; datafeedTask.datafeedManager = datafeedManager; datafeedManager.run(datafeedTask, @@ -373,7 +374,7 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); return true; } - DatafeedState datafeedState = (DatafeedState) persistentTask.getStatus(); + DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); return datafeedState == DatafeedState.STARTED; } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 4b68f74eb1702..faf6aa80b7a6f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -222,10 +222,10 @@ public void onFailure(Exception e) { } @Override - protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTaskTask, + protected void taskOperation(StopDatafeedAction.Request request, TransportStartDatafeedAction.DatafeedTask datafeedTask, ActionListener listener) { - DatafeedState taskStatus = DatafeedState.STOPPING; - datafeedTaskTask.updatePersistentStatus(taskStatus, ActionListener.wrap(task -> { + DatafeedState taskState = DatafeedState.STOPPING; + datafeedTask.updatePersistentTaskState(taskState, ActionListener.wrap(task -> { // we need to fork because we are now on a network threadpool threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(new AbstractRunnable() { @Override @@ -235,7 +235,7 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { - datafeedTaskTask.stop("stop_datafeed (api)", request.getStopTimeout()); + datafeedTask.stop("stop_datafeed (api)", request.getStopTimeout()); listener.onResponse(new StopDatafeedAction.Response(true)); } }); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java index 69acbad20fb2d..338c111401acf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManager.java @@ -88,7 +88,7 @@ public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer { Holder holder = new Holder(task, datafeed, datafeedJob, new ProblemTracker(auditor, job.getId()), taskHandler); runningDatafeedsOnThisNode.put(task.getAllocationId(), holder); - task.updatePersistentStatus(DatafeedState.STARTED, new ActionListener>() { + task.updatePersistentTaskState(DatafeedState.STARTED, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { taskRunner.runWhenJobIsOpened(task); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 0eb57ab79be5d..bebf0f3935d92 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -12,12 +12,12 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import java.util.List; import java.util.Objects; @@ -64,11 +64,11 @@ private AssignmentFailure checkAssignment() { PriorityFailureCollector priorityFailureCollector = new PriorityFailureCollector(); priorityFailureCollector.add(verifyIndicesActive(datafeed)); - JobTaskStatus taskStatus = null; + JobTaskState jobTaskState = null; JobState jobState = JobState.CLOSED; if (jobTask != null) { - taskStatus = (JobTaskStatus) jobTask.getStatus(); - jobState = taskStatus == null ? JobState.OPENING : taskStatus.getState(); + jobTaskState = (JobTaskState) jobTask.getState(); + jobState = jobTaskState == null ? JobState.OPENING : jobTaskState.getState(); } if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) { @@ -78,8 +78,8 @@ private AssignmentFailure checkAssignment() { priorityFailureCollector.add(new AssignmentFailure(reason, true)); } - if (taskStatus != null && taskStatus.isStatusStale(jobTask)) { - String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] status is stale"; + if (jobTaskState != null && jobTaskState.isStatusStale(jobTask)) { + String reason = "cannot start datafeed [" + datafeed.getId() + "], job [" + datafeed.getJobId() + "] state is stale"; priorityFailureCollector.add(new AssignmentFailure(reason, true)); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index d3a848ef3821f..b6efb688c1797 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; @@ -623,8 +623,8 @@ public Optional jobOpenTime(JobTask jobTask) { } void setJobState(JobTask jobTask, JobState state) { - JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()); @@ -638,8 +638,8 @@ public void onFailure(Exception e) { } void setJobState(JobTask jobTask, JobState state, CheckedConsumer handler) { - JobTaskStatus taskStatus = new JobTaskStatus(state, jobTask.getAllocationId()); - jobTask.updatePersistentStatus(taskStatus, new ActionListener>() { + JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId()); + jobTask.updatePersistentTaskState(jobTaskState, new ActionListener>() { @Override public void onResponse(PersistentTask persistentTask) { try { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 8049b5655d63b..f6fb2db3c9bb9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -363,7 +363,7 @@ public void testGetJobState() { new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); assertEquals(JobState.OPENING, MlMetadata.getJobState("foo", tasksBuilder.build())); - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("foo"), new JobTaskStatus(JobState.OPENED, tasksBuilder.getLastAllocationId())); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId("foo"), new JobTaskState(JobState.OPENED, tasksBuilder.getLastAllocationId())); assertEquals(JobState.OPENED, MlMetadata.getJobState("foo", tasksBuilder.build())); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java index f1679b8b0b9d1..d65fc1476e75e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportCloseJobActionTests.java @@ -314,7 +314,7 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat PersistentTasksCustomMetaData.Builder tasks) { tasks.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new Assignment(nodeId, "test assignment")); - tasks.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state); + tasks.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 661919cffce3c..a52d880a4a3b9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -36,7 +36,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings; @@ -322,7 +322,7 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() { assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId("job_id6"), null); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId("job_id6"), null); tasks = tasksBuilder.build(); csBuilder = ClusterState.builder(cs); @@ -535,7 +535,7 @@ public static void addJobTask(String jobId, String nodeId, JobState jobState, Pe builder.addTask(MlMetadata.jobTaskId(jobId), OpenJobAction.TASK_NAME, new OpenJobAction.JobParams(jobId), new Assignment(nodeId, "test assignment")); if (jobState != null) { - builder.updateTaskStatus(MlMetadata.jobTaskId(jobId), new JobTaskStatus(jobState, builder.getLastAllocationId())); + builder.updateTaskState(MlMetadata.jobTaskId(jobId), new JobTaskState(jobState, builder.getLastAllocationId())); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java index a61709be424e8..55a0f4006bcdd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedActionTests.java @@ -31,7 +31,7 @@ public void testValidate() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(MLMetadataField.datafeedTaskId("foo"), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("node_id", "")); - tasksBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED); + tasksBuilder.updateTaskState(MLMetadataField.datafeedTaskId("foo"), DatafeedState.STARTED); tasksBuilder.build(); Job job = createDatafeedJob().build(new Date()); @@ -121,6 +121,6 @@ public static void addTask(String datafeedId, long startTime, String nodeId, Dat taskBuilder.addTask(MLMetadataField.datafeedTaskId(datafeedId), StartDatafeedAction.TASK_NAME, new StartDatafeedAction.DatafeedParams(datafeedId, startTime), new PersistentTasksCustomMetaData.Assignment(nodeId, "test assignment")); - taskBuilder.updateTaskStatus(MLMetadataField.datafeedTaskId(datafeedId), state); + taskBuilder.updateTaskState(MLMetadataField.datafeedTaskId(datafeedId), state); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java index bd722ebf8ef9a..f609f0c8c5ed9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedManagerTests.java @@ -378,7 +378,7 @@ private static DatafeedTask createDatafeedTask(String datafeedId, long startTime ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(mock(PersistentTask.class)); return null; - }).when(task).updatePersistentStatus(any(), any()); + }).when(task).updatePersistentTaskState(any(), any()); return task; } @@ -394,7 +394,7 @@ private DatafeedTask spyDatafeedTask(DatafeedTask task) { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; listener.onResponse(mock(PersistentTask.class)); return null; - }).when(task).updatePersistentStatus(any(), any()); + }).when(task).updatePersistentTaskState(any(), any()); return task; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java index 0fee78611a7bf..96ae3b5ef38b6 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelectorTests.java @@ -31,7 +31,7 @@ import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.junit.Before; @@ -255,20 +255,20 @@ public void testSelectNode_jobTaskStale() { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), nodeId, JobState.OPENED, tasksBuilder); // Set to lower allocationId, so job task is stale: - tasksBuilder.updateTaskStatus(MlMetadata.jobTaskId(job.getId()), new JobTaskStatus(JobState.OPENED, 0)); + tasksBuilder.updateTaskState(MlMetadata.jobTaskId(job.getId()), new JobTaskState(JobState.OPENED, 0)); tasks = tasksBuilder.build(); givenClusterState("foo", 1, 0); PersistentTasksCustomMetaData.Assignment result = new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").selectNode(); assertNull(result.getExecutorNode()); - assertEquals("cannot start datafeed [datafeed_id], job [job_id] status is stale", + assertEquals("cannot start datafeed [datafeed_id], job [job_id] state is stale", result.getExplanation()); ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DatafeedNodeSelector(clusterState, resolver, "datafeed_id").checkDatafeedTaskCanBeCreated()); assertThat(e.getMessage(), containsString("No node found to start datafeed [datafeed_id], allocation explanation " - + "[cannot start datafeed [datafeed_id], job [job_id] status is stale]")); + + "[cannot start datafeed [datafeed_id], job [job_id] state is stale]")); tasksBuilder = PersistentTasksCustomMetaData.builder(); addJobTask(job.getId(), "node_id1", JobState.OPENED, tasksBuilder); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index ce47fb0adf80c..e3d67bb0bdb71 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -39,7 +39,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; @@ -211,9 +211,9 @@ public void testDedicatedMlNode() throws Exception { DiscoveryNode node = clusterState.nodes().resolveNode(task.getExecutorNode()); assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(JobState.OPENED, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(JobState.OPENED, jobTaskState.getState()); }); logger.info("stop the only running ml node"); @@ -264,7 +264,7 @@ public void testMaxConcurrentJobAllocations() throws Exception { for (DiscoveryNode node : event.state().nodes()) { Collection> foundTasks = tasks.findTasks(OpenJobAction.TASK_NAME, task -> { - JobTaskStatus jobTaskState = (JobTaskStatus) task.getStatus(); + JobTaskState jobTaskState = (JobTaskState) task.getState(); return node.getId().equals(task.getExecutorNode()) && (jobTaskState == null || jobTaskState.isStatusStale(task)); }); @@ -396,9 +396,9 @@ private void assertJobTask(String jobId, JobState expectedState, boolean hasExec assertThat(node.getAttributes(), hasEntry(MachineLearning.ML_ENABLED_NODE_ATTR, "true")); assertThat(node.getAttributes(), hasEntry(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, "20")); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(expectedState, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(expectedState, jobTaskState.getState()); } else { assertNull(task.getExecutorNode()); } @@ -411,9 +411,9 @@ private CheckedRunnable checkAllJobsAreAssignedAndOpened(int numJobs) assertEquals(numJobs, tasks.taskMap().size()); for (PersistentTask task : tasks.taskMap().values()) { assertNotNull(task.getExecutorNode()); - JobTaskStatus jobTaskStatus = (JobTaskStatus) task.getStatus(); - assertNotNull(jobTaskStatus); - assertEquals(JobState.OPENED, jobTaskStatus.getState()); + JobTaskState jobTaskState = (JobTaskState) task.getState(); + assertNotNull(jobTaskState); + assertEquals(JobState.OPENED, jobTaskState.getState()); } }; } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java index 17e7b89978e16..f06b73fcd40aa 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/TooManyJobsIT.java @@ -20,7 +20,7 @@ import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; @@ -58,7 +58,7 @@ public void testCloseFailedJob() throws Exception { assertEquals(1, tasks.taskMap().size()); // now just double check that the first job is still opened: PersistentTasksCustomMetaData.PersistentTask task = tasks.getTask(MlMetadata.jobTaskId("close-failed-job-1")); - assertEquals(JobState.OPENED, ((JobTaskStatus) task.getStatus()).getState()); + assertEquals(JobState.OPENED, ((JobTaskState) task.getState()).getState()); } public void testSingleNode() throws Exception { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java similarity index 53% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java index 7183235b6ff68..4dfd1965804e5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStatusTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/config/JobTaskStateTests.java @@ -9,22 +9,22 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; -public class JobTaskStatusTests extends AbstractSerializingTestCase { +public class JobTaskStateTests extends AbstractSerializingTestCase { @Override - protected JobTaskStatus createTestInstance() { - return new JobTaskStatus(randomFrom(JobState.values()), randomLong()); + protected JobTaskState createTestInstance() { + return new JobTaskState(randomFrom(JobState.values()), randomLong()); } @Override - protected Writeable.Reader instanceReader() { - return JobTaskStatus::new; + protected Writeable.Reader instanceReader() { + return JobTaskState::new; } @Override - protected JobTaskStatus doParseInstance(XContentParser parser) { - return JobTaskStatus.fromXContent(parser); + protected JobTaskState doParseInstance(XContentParser parser) { + return JobTaskState.fromXContent(parser); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index c3e830553a237..fa41cf0918f71 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -27,7 +27,7 @@ import org.elasticsearch.xpack.core.ml.job.config.Detector; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.config.ModelPlotConfig; @@ -199,7 +199,7 @@ public void testOpenJob() { manager.openJob(jobTask, e -> {}); assertEquals(1, manager.numberOfOpenJobs()); assertTrue(manager.jobHasActiveAutodetectProcess(jobTask)); - verify(jobTask).updatePersistentStatus(eq(new JobTaskStatus(JobState.OPENED, 1L)), any()); + verify(jobTask).updatePersistentTaskState(eq(new JobTaskState(JobState.OPENED, 1L)), any()); } public void testOpenJob_exceedMaxNumJobs() { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java index 425629c248c9c..50b3f21800d06 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; @@ -62,7 +63,7 @@ public RollupJobPersistentTasksExecutor(Settings settings, Client client, Schedu } @Override - protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, Status status) { + protected void nodeOperation(AllocatedPersistentTask task, @Nullable RollupJob params, PersistentTaskState state) { RollupJobTask rollupJobTask = (RollupJobTask) task; SchedulerEngine.Job schedulerJob = new SchedulerEngine.Job(SCHEDULE_NAME + "_" + params.getConfig().getId(), new CronSchedule(params.getConfig().getCron())); @@ -80,7 +81,7 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTasksCustomMetaData.PersistentTask persistentTask, Map headers) { return new RollupJobTask(id, type, action, parentTaskId, persistentTask.getParams(), - (RollupJobStatus) persistentTask.getStatus(), client, schedulerEngine, threadPool, headers); + (RollupJobStatus) persistentTask.getState(), client, schedulerEngine, threadPool, headers); } } @@ -115,15 +116,15 @@ protected void doNextBulk(BulkRequest request, ActionListener next } @Override - protected void doSaveState(IndexerState state, Map position, Runnable next) { - if (state.equals(IndexerState.ABORTING)) { + protected void doSaveState(IndexerState indexerState, Map position, Runnable next) { + if (indexerState.equals(IndexerState.ABORTING)) { // If we're aborting, just invoke `next` (which is likely an onFailure handler) next.run(); } else { // Otherwise, attempt to persist our state - final RollupJobStatus status = new RollupJobStatus(state, getPosition()); - logger.debug("Updating persistent status of job [" + job.getConfig().getId() + "] to [" + state.toString() + "]"); - updatePersistentStatus(status, ActionListener.wrap(task -> next.run(), exc -> next.run())); + final RollupJobStatus state = new RollupJobStatus(indexerState, getPosition()); + logger.debug("Updating persistent state of job [" + job.getConfig().getId() + "] to [" + indexerState.toString() + "]"); + updatePersistentTaskState(state, ActionListener.wrap(task -> next.run(), exc -> next.run())); } } @@ -148,7 +149,7 @@ protected void onAbort() { private final ThreadPool threadPool; private final RollupIndexer indexer; - RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus status, + RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state, Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map headers) { super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers); this.job = job; @@ -158,16 +159,17 @@ protected void onAbort() { // If status is not null, we are resuming rather than starting fresh. Map initialPosition = null; IndexerState initialState = IndexerState.STOPPED; - if (status != null) { - logger.debug("We have existing status, setting state to [" + status.getState() + "] " + - "and current position to [" + status.getPosition() + "] for job [" + job.getConfig().getId() + "]"); - if (status.getState().equals(IndexerState.INDEXING)) { + if (state != null) { + final IndexerState existingState = state.getIndexerState(); + logger.debug("We have existing state, setting state to [" + existingState + "] " + + "and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]"); + if (existingState.equals(IndexerState.INDEXING)) { /* * If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking * it is indexing but without the actual indexing thread running. */ initialState = IndexerState.STARTED; - } else if (status.getState().equals(IndexerState.ABORTING) || status.getState().equals(IndexerState.STOPPING)) { + } else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) { // It shouldn't be possible to persist ABORTING, but if for some reason it does, // play it safe and restore the job as STOPPED. An admin will have to clean it up, // but it won't be running, and won't delete itself either. Safest option. @@ -175,9 +177,9 @@ protected void onAbort() { // to restore as STOPEPD initialState = IndexerState.STOPPED; } else { - initialState = status.getState(); + initialState = existingState; } - initialPosition = status.getPosition(); + initialPosition = state.getPosition(); } this.indexer = new ClientRollupPageManager(job, initialState, initialPosition, new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId()))); @@ -227,20 +229,20 @@ public synchronized void start(ActionListener lis + " state was [" + newState + "]")); return; } - final RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition()); - logger.debug("Updating status for rollup job [" + job.getConfig().getId() + "] to [" + status.getState() + "][" + - status.getPosition() + "]"); - updatePersistentStatus(status, + final RollupJobStatus state = new RollupJobStatus(IndexerState.STARTED, indexer.getPosition()); + logger.debug("Updating state for rollup job [" + job.getConfig().getId() + "] to [" + state.getIndexerState() + "][" + + state.getPosition() + "]"); + updatePersistentTaskState(state, ActionListener.wrap( (task) -> { - logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() + "] to [" - + status.getState() + "][" + status.getPosition() + "]"); + logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + "] to [" + + state.getIndexerState() + "][" + state.getPosition() + "]"); listener.onResponse(new StartRollupJobAction.Response(true)); }, (exc) -> { listener.onFailure( - new ElasticsearchException("Error while updating status for rollup job [" + job.getConfig().getId() - + "] to [" + status.getState() + "].", exc) + new ElasticsearchException("Error while updating state for rollup job [" + job.getConfig().getId() + + "] to [" + state.getIndexerState() + "].", exc) ); } ) @@ -268,17 +270,17 @@ public synchronized void stop(ActionListener liste case STOPPING: // update the persistent state only if there is no background job running, // otherwise the state is updated by the indexer when the background job detects the STOPPING state. - RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition()); - updatePersistentStatus(status, + RollupJobStatus state = new RollupJobStatus(IndexerState.STOPPED, indexer.getPosition()); + updatePersistentTaskState(state, ActionListener.wrap( (task) -> { - logger.debug("Succesfully updated status for rollup job [" + job.getConfig().getId() - + "] to [" + status.getState() + "]"); + logger.debug("Succesfully updated state for rollup job [" + job.getConfig().getId() + + "] to [" + state.getIndexerState() + "]"); listener.onResponse(new StopRollupJobAction.Response(true)); }, (exc) -> { - listener.onFailure(new ElasticsearchException("Error while updating status for rollup job [" - + job.getConfig().getId() + "] to [" + status.getState() + "].", exc)); + listener.onFailure(new ElasticsearchException("Error while updating state for rollup job [" + + job.getConfig().getId() + "] to [" + state.getIndexerState() + "].", exc)); }) ); break; diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java index ce8bf936d9768..3f930cb42981d 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java @@ -7,10 +7,8 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -27,7 +25,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Netty4Plugin; @@ -104,7 +101,7 @@ protected Settings transportClientSettings() { } @Before - public void createIndex() throws Exception { + public void createIndex() { client().admin().indices().prepareCreate("test-1").addMapping("doc", "{\"doc\": {\"properties\": {" + "\"date_histo\": {\"type\": \"date\"}, " + "\"histo\": {\"type\": \"integer\"}, " + @@ -125,7 +122,7 @@ public void createIndex() throws Exception { } } } - BulkResponse response = bulk.get(); + bulk.get(); client().admin().indices().prepareRefresh("test-1").get(); } @@ -195,27 +192,23 @@ public void testIndexPattern() throws Exception { // Make sure it started ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern"); - if (rollupJobStatus == null) {; + RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); + if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); }, 60, TimeUnit.SECONDS); // And wait for it to finish ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "testIndexPattern"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); }, 60, TimeUnit.SECONDS); @@ -274,23 +267,20 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // Make sure it started ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); }, 60, TimeUnit.SECONDS); //but not the other task ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STOPPED)); }, 60, TimeUnit.SECONDS); @@ -301,9 +291,7 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // Make sure the first job's task is gone ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job1"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); assertTrue(rollupJobStatus == null); }, 60, TimeUnit.SECONDS); @@ -320,10 +308,9 @@ public void testTwoJobsStartStopDeleteOne() throws Exception { // and still STOPPED ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, "job2"); + RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); assertTrue(state.equals(IndexerState.STOPPED)); }, 60, TimeUnit.SECONDS); } @@ -404,19 +391,17 @@ public void testBig() throws Exception { Assert.assertThat(response.isStarted(), equalTo(true)); ESTestCase.assertBusy(() -> { - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId); + RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); if (rollupJobStatus == null) { fail("null"); } - IndexerState state = rollupJobStatus.getState(); + IndexerState state = rollupJobStatus.getIndexerState(); logger.error("state: [" + state + "]"); assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); }, 60, TimeUnit.SECONDS); - ListTasksResponse tasksResponse = client().admin().cluster().prepareListTasks().setDetailed(true).get(); - RollupJobStatus rollupJobStatus = getRollupJobStatus(tasksResponse, taskId); + RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); if (rollupJobStatus == null) { Assert.fail("rollup job status should not be null"); } @@ -481,11 +466,13 @@ private void verifyAgg(InternalDateHistogram verify, InternalDateHistogram rollu } } - private RollupJobStatus getRollupJobStatus(ListTasksResponse tasksResponse, String taskId) { - for (TaskInfo task : tasksResponse.getTasks()) { - if (task.getDescription().equals("rollup_" + taskId)) { - return ((RollupJobStatus) task.getStatus()); - } + private RollupJobStatus getRollupJobStatus(final String taskId) { + final GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(taskId); + final GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, request).actionGet(); + + if (response.getJobs() != null && response.getJobs().isEmpty() == false) { + assertThat("Expect 1 rollup job with id " + taskId, response.getJobs().size(), equalTo(1)); + return response.getJobs().iterator().next().getStatus(); } return null; } @@ -498,13 +485,13 @@ public void cleanup() throws ExecutionException, InterruptedException { for (GetRollupJobsAction.JobWrapper job : response.getJobs()) { StopRollupJobAction.Request stopRequest = new StopRollupJobAction.Request(job.getJob().getId()); try { - StopRollupJobAction.Response stopResponse = client().execute(StopRollupJobAction.INSTANCE, stopRequest).get(); + client().execute(StopRollupJobAction.INSTANCE, stopRequest).get(); } catch (ElasticsearchException e) { // } DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request(job.getJob().getId()); - DeleteRollupJobAction.Response deleteResponse = client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); + client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); } } } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index d12be5e6fc196..ffcae267340f8 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; @@ -64,7 +65,7 @@ public void testInitialStatusStopped() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -77,7 +78,7 @@ public void testInitialStatusAborting() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -90,7 +91,7 @@ public void testInitialStatusStopping() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -103,7 +104,7 @@ public void testInitialStatusStarted() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -116,7 +117,7 @@ public void testInitialStatusIndexing() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); } @@ -128,7 +129,7 @@ public void testNoInitialStatus() { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); } @@ -140,7 +141,7 @@ public void testStartWhenStarted() throws InterruptedException { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -172,13 +173,14 @@ public void testStartWhenStopping() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); int c = counter.get(); if (c == 0) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); } else if (c == 1) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else { fail("Should not have updated persistent statuse > 2 times"); } @@ -187,7 +189,7 @@ public void updatePersistentStatus(Status status, ActionListener() { @@ -248,14 +250,15 @@ public void testStartWhenStopped() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -264,7 +267,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertThat(((RollupJobStatus)task.getStatus()).getPosition().size(), equalTo(1)); assertTrue(((RollupJobStatus)task.getStatus()).getPosition().containsKey("foo")); @@ -301,7 +305,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus)status).getState(), equalTo(IndexerState.STARTED)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); assertNull(((RollupJobStatus)task.getStatus()).getPosition()); CountDownLatch latch = new CountDownLatch(1); @@ -340,7 +345,7 @@ public void updatePersistentStatus(Status status, ActionListener> listener) { + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { Integer counterValue = counter.getAndIncrement(); if (counterValue == 0) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { @@ -405,14 +411,14 @@ public void updatePersistentStatus(Status status, ActionListener() { @Override public void onResponse(StartRollupJobAction.Response response) { assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); started.set(true); } @@ -424,7 +430,7 @@ public void onFailure(Exception e) { ESTestCase.awaitBusy(started::get); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // Allow search response to return now latch.countDown(); @@ -475,11 +481,12 @@ public void testTriggerWithHeaders() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { Integer counterValue = counter.getAndIncrement(); if (counterValue == 0) { - assertThat(status, instanceOf(RollupJobStatus.class)); - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(taskState, instanceOf(RollupJobStatus.class)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); listener.onResponse(new PersistentTasksCustomMetaData.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, new PersistentTasksCustomMetaData.Assignment("foo", "foo"))); } else if (counterValue == 1) { @@ -488,14 +495,14 @@ public void updatePersistentStatus(Status status, ActionListener() { @Override public void onResponse(StartRollupJobAction.Response response) { assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); started.set(true); } @@ -507,7 +514,7 @@ public void onFailure(Exception e) { ESTestCase.awaitBusy(started::get); task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.INDEXING)); // Should still be started, not INDEXING + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); assertThat(task.getStats().getNumInvocations(), equalTo(1L)); // Allow search response to return now latch.countDown(); @@ -524,7 +531,7 @@ public void testStopWhenStopped() throws InterruptedException { SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); CountDownLatch latch = new CountDownLatch(1); task.stop(new ActionListener() { @@ -553,15 +560,16 @@ public void testStopWhenStopping() throws InterruptedException { RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()) { @Override - public void updatePersistentStatus(Status status, ActionListener> listener) { - assertThat(status, instanceOf(RollupJobStatus.class)); + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); int c = counter.get(); if (c == 0) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STARTED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); } else if (c == 1) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else if (c == 2) { - assertThat(((RollupJobStatus) status).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); } else { fail("Should not have updated persistent statuse > 3 times"); } @@ -571,7 +579,7 @@ public void updatePersistentStatus(Status status, ActionListener() { @@ -642,7 +650,7 @@ public void markAsCompleted() { latch.countDown(); } }; - assertThat(((RollupJobStatus)task.getStatus()).getState(), equalTo(IndexerState.STOPPED)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); task.onCancelled(); task.stop(new ActionListener() { diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java index f70efc72506d3..9057db476ad77 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeAutodetectIntegTestCase.java @@ -26,13 +26,13 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchModule; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.transport.Netty4Plugin; @@ -70,7 +70,7 @@ import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; -import org.elasticsearch.xpack.core.ml.job.config.JobTaskStatus; +import org.elasticsearch.xpack.core.ml.job.config.JobTaskState; import org.elasticsearch.xpack.core.ml.job.config.JobUpdate; import org.elasticsearch.xpack.core.ml.job.config.MlFilter; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; @@ -449,8 +449,8 @@ protected void ensureClusterStateConsistency() throws IOException { StartDatafeedAction.DatafeedParams::new)); entries.add(new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME, OpenJobAction.JobParams::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, JobTaskStatus.NAME, JobTaskStatus::new)); - entries.add(new NamedWriteableRegistry.Entry(Task.Status.class, DatafeedState.NAME, DatafeedState::fromStream)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new)); + entries.add(new NamedWriteableRegistry.Entry(PersistentTaskState.class, DatafeedState.NAME, DatafeedState::fromStream)); entries.add(new NamedWriteableRegistry.Entry(ClusterState.Custom.class, TokenMetaData.TYPE, TokenMetaData::new)); final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries); ClusterState masterClusterState = client().admin().cluster().prepareState().all().get().getState();