diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 16c9f486d8d63..75dc0a946d636 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -1388,7 +1388,10 @@ tag::state-datafeed[] The status of the {dfeed}, which can be one of the following values: + -- +* `starting`: The {dfeed} has been requested to start but has not yet started. * `started`: The {dfeed} is actively receiving data. +* `stopping`: The {dfeed} has been requested to stop gracefully and is +completing its final action. * `stopped`: The {dfeed} is stopped and will not receive data until it is re-started. -- diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 5be6ffe2878d2..fa20e805cc589 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -132,14 +132,18 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDatafeedTask(datafeedId, tasks); - // TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects - if (task != null && task.getState() != null) { - return (DatafeedState) task.getState(); - } else { + if (task == null) { // If we haven't started a datafeed then there will be no persistent task, // which is the same as if the datafeed was't started return DatafeedState.STOPPED; } + DatafeedState taskState = (DatafeedState) task.getState(); + if (taskState == null) { + // If we haven't set a state yet then the task has never been assigned, so + // report that it's starting + return DatafeedState.STARTING; + } + return taskState; } public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index 902df8cd44fa2..a0b7d6a035f75 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -55,7 +55,8 @@ public void testGetDatefeedState() { tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); - assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); + // A task with no state means the datafeed is starting + assertEquals(DatafeedState.STARTING, MlTasks.getDatafeedState("foo", tasksBuilder.build())); tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED); assertEquals(DatafeedState.STARTED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 44bf8c483dfd9..79cdf0c637562 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -537,7 +537,7 @@ void closeJob(String reason) { * Important: the methods of this class must NOT throw exceptions. If they did then the callers * of endpoints waiting for a condition tested by this predicate would never get a response. */ - private class JobPredicate implements Predicate> { + private static class JobPredicate implements Predicate> { private volatile boolean opened; private volatile Exception exception; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 40c4cea806f6f..67dd7d40ed7c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -295,7 +295,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask> { + private static class DatafeedPredicate implements Predicate> { + private volatile boolean started; private volatile Exception exception; @Override @@ -487,15 +488,21 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa return false; } PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); - if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && - assignment.isAssigned() == false) { - // Assignment has failed despite passing our "fast fail" validation - exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + + if (assignment != null) { + // This means we are awaiting the datafeed's job to be assigned to a node + if (assignment.equals(DatafeedNodeSelector.AWAITING_JOB_ASSIGNMENT)) { + return true; + } + if (assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && assignment.isAssigned() == false) { + // Assignment has failed despite passing our "fast fail" validation + exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); - return true; + return true; + } } DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); - return datafeedState == DatafeedState.STARTED; + started = datafeedState == DatafeedState.STARTED; + return started; } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 903733e09970c..db94905a1bdb5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -93,14 +93,8 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, List stoppingDatafeedIds, List notStoppedDatafeedIds) { switch (datafeedState) { + // Treat STARTING like STARTED for stop API behaviour. case STARTING: - // The STARTING state is not used anywhere at present, so this should never happen. - // At present datafeeds that have a persistent task that hasn't yet been assigned - // a state are reported as STOPPED (which is not great). It could be considered a - // breaking change to introduce the STARTING state though, so let's aim to do it in - // version 8. Also consider treating STARTING like STARTED for stop API behaviour. - notStoppedDatafeedIds.add(datafeedId); - break; case STARTED: startedDatafeedIds.add(datafeedId); notStoppedDatafeedIds.add(datafeedId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 03c8272a979d8..a4c9748dd64c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -32,6 +32,9 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class); + public static final PersistentTasksCustomMetaData.Assignment AWAITING_JOB_ASSIGNMENT = + new PersistentTasksCustomMetaData.Assignment(null, "datafeed awaiting job assignment."); + private final String datafeedId; private final String jobId; private final List datafeedIndices; @@ -76,9 +79,14 @@ public PersistentTasksCustomMetaData.Assignment selectNode() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure == null) { - return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), ""); + String jobNode = jobTask.getExecutorNode(); + if (jobNode == null) { + return AWAITING_JOB_ASSIGNMENT; + } + return new PersistentTasksCustomMetaData.Assignment(jobNode, ""); } LOGGER.debug(assignmentFailure.reason); + assert assignmentFailure.reason.isEmpty() == false; return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 23275958b3c10..e0370066c047c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -424,6 +425,60 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception { assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true)); } + public void testCloseUnassignedLazyJobAndDatafeed() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + ensureStableCluster(3); + + String jobId = "test-lazy-stop"; + String datafeedId = jobId + "-datafeed"; + // Assume the test machine won't have space to assign a 2TB job + Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.TB), true); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get(); + + DatafeedConfig config = createDatafeed(datafeedId, jobId, Collections.singletonList("data")); + PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); + client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); + + ensureYellow(); // at least the primary shards of the indices a job uses should be started + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId); + client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet(); + + // Job state should be opening because it won't fit anyway, but is allowed to open lazily + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.OPENING, jobStatsResponse.getResponse().results().get(0).getState()); + + StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); + client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest).actionGet(); + + // Datafeed state should be starting while it waits for job assignment + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STARTING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // A starting datafeed can be stopped normally or by force + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(randomBoolean()); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // An opening job can also be stopped normally or by force + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + closeJobRequest.setForce(randomBoolean()); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + + jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.CLOSED, jobStatsResponse.getResponse().results().get(0).getState()); + } + private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 887cd17eb6b50..6ae0d3fbd57be 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -127,6 +127,10 @@ protected Job.Builder createJob(String id) { } protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { + return createJob(id, modelMemoryLimit, false); + } + + protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit, boolean allowLazyOpen) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); @@ -141,6 +145,7 @@ protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { } builder.setAnalysisConfig(analysisConfig); builder.setDataDescription(dataDescription); + builder.setAllowLazyOpen(allowLazyOpen); return builder; }