From c386dc2030d5923ee14394840f51951282e3fc5f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Sat, 21 Mar 2020 14:02:49 +0000 Subject: [PATCH] [ML] Introduce a "starting" datafeed state for lazy jobs It is possible for ML jobs to open lazily if the "allow_lazy_open" option in the job config is set to true. Such jobs wait in the "opening" state until a node has sufficient capacity to run them. This commit fixes the bug that prevented datafeeds for jobs lazily waiting assignment from being started. The state of such datafeeds is "starting", and they can be stopped by the stop datafeed API while in this state with or without force. Relates #53763 --- docs/reference/ml/ml-shared.asciidoc | 3 + .../elasticsearch/xpack/core/ml/MlTasks.java | 12 ++-- .../xpack/core/ml/MlTasksTests.java | 3 +- .../ml/action/TransportOpenJobAction.java | 2 +- .../action/TransportStartDatafeedAction.java | 23 +++++--- .../action/TransportStopDatafeedAction.java | 8 +-- .../ml/datafeed/DatafeedNodeSelector.java | 10 +++- .../integration/BasicDistributedJobsIT.java | 55 +++++++++++++++++++ .../xpack/ml/support/BaseMlIntegTestCase.java | 5 ++ 9 files changed, 99 insertions(+), 22 deletions(-) diff --git a/docs/reference/ml/ml-shared.asciidoc b/docs/reference/ml/ml-shared.asciidoc index 16c9f486d8d63..75dc0a946d636 100644 --- a/docs/reference/ml/ml-shared.asciidoc +++ b/docs/reference/ml/ml-shared.asciidoc @@ -1388,7 +1388,10 @@ tag::state-datafeed[] The status of the {dfeed}, which can be one of the following values: + -- +* `starting`: The {dfeed} has been requested to start but has not yet started. * `started`: The {dfeed} is actively receiving data. +* `stopping`: The {dfeed} has been requested to stop gracefully and is +completing its final action. * `stopped`: The {dfeed} is stopped and will not receive data until it is re-started. -- diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java index 5be6ffe2878d2..fa20e805cc589 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlTasks.java @@ -132,14 +132,18 @@ public static JobState getJobStateModifiedForReassignments(@Nullable PersistentT public static DatafeedState getDatafeedState(String datafeedId, @Nullable PersistentTasksCustomMetaData tasks) { PersistentTasksCustomMetaData.PersistentTask task = getDatafeedTask(datafeedId, tasks); - // TODO: report (task != null && task.getState() == null) as STARTING in version 8, and fix side effects - if (task != null && task.getState() != null) { - return (DatafeedState) task.getState(); - } else { + if (task == null) { // If we haven't started a datafeed then there will be no persistent task, // which is the same as if the datafeed was't started return DatafeedState.STOPPED; } + DatafeedState taskState = (DatafeedState) task.getState(); + if (taskState == null) { + // If we haven't set a state yet then the task has never been assigned, so + // report that it's starting + return DatafeedState.STARTING; + } + return taskState; } public static DataFrameAnalyticsState getDataFrameAnalyticsState(String analyticsId, @Nullable PersistentTasksCustomMetaData tasks) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java index 902df8cd44fa2..a0b7d6a035f75 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/MlTasksTests.java @@ -55,7 +55,8 @@ public void testGetDatefeedState() { tasksBuilder.addTask(MlTasks.datafeedTaskId("foo"), MlTasks.DATAFEED_TASK_NAME, new StartDatafeedAction.DatafeedParams("foo", 0L), new PersistentTasksCustomMetaData.Assignment("bar", "test assignment")); - assertEquals(DatafeedState.STOPPED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); + // A task with no state means the datafeed is starting + assertEquals(DatafeedState.STARTING, MlTasks.getDatafeedState("foo", tasksBuilder.build())); tasksBuilder.updateTaskState(MlTasks.datafeedTaskId("foo"), DatafeedState.STARTED); assertEquals(DatafeedState.STARTED, MlTasks.getDatafeedState("foo", tasksBuilder.build())); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 44bf8c483dfd9..79cdf0c637562 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -537,7 +537,7 @@ void closeJob(String reason) { * Important: the methods of this class must NOT throw exceptions. If they did then the callers * of endpoints waiting for a condition tested by this predicate would never get a response. */ - private class JobPredicate implements Predicate> { + private static class JobPredicate implements Predicate> { private volatile boolean opened; private volatile Exception exception; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java index 40c4cea806f6f..67dd7d40ed7c9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java @@ -295,7 +295,7 @@ public void onResponse(PersistentTasksCustomMetaData.PersistentTask> { + private static class DatafeedPredicate implements Predicate> { + private volatile boolean started; private volatile Exception exception; @Override @@ -487,15 +488,21 @@ public boolean test(PersistentTasksCustomMetaData.PersistentTask persistentTa return false; } PersistentTasksCustomMetaData.Assignment assignment = persistentTask.getAssignment(); - if (assignment != null && assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && - assignment.isAssigned() == false) { - // Assignment has failed despite passing our "fast fail" validation - exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + + if (assignment != null) { + // This means we are awaiting the datafeed's job to be assigned to a node + if (assignment.equals(DatafeedNodeSelector.AWAITING_JOB_ASSIGNMENT)) { + return true; + } + if (assignment.equals(PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT) == false && assignment.isAssigned() == false) { + // Assignment has failed despite passing our "fast fail" validation + exception = new ElasticsearchStatusException("Could not start datafeed, allocation explanation [" + assignment.getExplanation() + "]", RestStatus.TOO_MANY_REQUESTS); - return true; + return true; + } } DatafeedState datafeedState = (DatafeedState) persistentTask.getState(); - return datafeedState == DatafeedState.STARTED; + started = datafeedState == DatafeedState.STARTED; + return started; } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 903733e09970c..db94905a1bdb5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -93,14 +93,8 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId, List stoppingDatafeedIds, List notStoppedDatafeedIds) { switch (datafeedState) { + // Treat STARTING like STARTED for stop API behaviour. case STARTING: - // The STARTING state is not used anywhere at present, so this should never happen. - // At present datafeeds that have a persistent task that hasn't yet been assigned - // a state are reported as STOPPED (which is not great). It could be considered a - // breaking change to introduce the STARTING state though, so let's aim to do it in - // version 8. Also consider treating STARTING like STARTED for stop API behaviour. - notStoppedDatafeedIds.add(datafeedId); - break; case STARTED: startedDatafeedIds.add(datafeedId); notStoppedDatafeedIds.add(datafeedId); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java index 03c8272a979d8..a4c9748dd64c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java @@ -32,6 +32,9 @@ public class DatafeedNodeSelector { private static final Logger LOGGER = LogManager.getLogger(DatafeedNodeSelector.class); + public static final PersistentTasksCustomMetaData.Assignment AWAITING_JOB_ASSIGNMENT = + new PersistentTasksCustomMetaData.Assignment(null, "datafeed awaiting job assignment."); + private final String datafeedId; private final String jobId; private final List datafeedIndices; @@ -76,9 +79,14 @@ public PersistentTasksCustomMetaData.Assignment selectNode() { AssignmentFailure assignmentFailure = checkAssignment(); if (assignmentFailure == null) { - return new PersistentTasksCustomMetaData.Assignment(jobTask.getExecutorNode(), ""); + String jobNode = jobTask.getExecutorNode(); + if (jobNode == null) { + return AWAITING_JOB_ASSIGNMENT; + } + return new PersistentTasksCustomMetaData.Assignment(jobNode, ""); } LOGGER.debug(assignmentFailure.reason); + assert assignmentFailure.reason.isEmpty() == false; return new PersistentTasksCustomMetaData.Assignment(null, assignmentFailure.reason); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java index 23275958b3c10..e0370066c047c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/BasicDistributedJobsIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.core.ml.action.PutDatafeedAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction; +import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; @@ -424,6 +425,60 @@ public void testMlStateAndResultsIndicesNotAvailable() throws Exception { assertBusy(() -> assertJobTask(jobId, JobState.OPENED, true)); } + public void testCloseUnassignedLazyJobAndDatafeed() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + ensureStableCluster(3); + + String jobId = "test-lazy-stop"; + String datafeedId = jobId + "-datafeed"; + // Assume the test machine won't have space to assign a 2TB job + Job.Builder job = createJob(jobId, new ByteSizeValue(2, ByteSizeUnit.TB), true); + PutJobAction.Request putJobRequest = new PutJobAction.Request(job); + client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet(); + + client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get(); + + DatafeedConfig config = createDatafeed(datafeedId, jobId, Collections.singletonList("data")); + PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config); + client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet(); + + ensureYellow(); // at least the primary shards of the indices a job uses should be started + OpenJobAction.Request openJobRequest = new OpenJobAction.Request(jobId); + client().execute(OpenJobAction.INSTANCE, openJobRequest).actionGet(); + + // Job state should be opening because it won't fit anyway, but is allowed to open lazily + GetJobsStatsAction.Request jobStatsRequest = new GetJobsStatsAction.Request(jobId); + GetJobsStatsAction.Response jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.OPENING, jobStatsResponse.getResponse().results().get(0).getState()); + + StartDatafeedAction.Request startDataFeedRequest = new StartDatafeedAction.Request(config.getId(), 0L); + client().execute(StartDatafeedAction.INSTANCE, startDataFeedRequest).actionGet(); + + // Datafeed state should be starting while it waits for job assignment + GetDatafeedsStatsAction.Request datafeedStatsRequest = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response datafeedStatsResponse = + client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STARTING, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // A starting datafeed can be stopped normally or by force + StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); + stopDatafeedRequest.setForce(randomBoolean()); + StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); + assertTrue(stopDatafeedResponse.isStopped()); + + datafeedStatsResponse = client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); + assertEquals(DatafeedState.STOPPED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); + + // An opening job can also be stopped normally or by force + CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); + closeJobRequest.setForce(randomBoolean()); + CloseJobAction.Response closeJobResponse = client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet(); + assertTrue(closeJobResponse.isClosed()); + + jobStatsResponse = client().execute(GetJobsStatsAction.INSTANCE, jobStatsRequest).actionGet(); + assertEquals(JobState.CLOSED, jobStatsResponse.getResponse().results().get(0).getState()); + } + private void assertJobTask(String jobId, JobState expectedState, boolean hasExecutorNode) { ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 887cd17eb6b50..6ae0d3fbd57be 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -127,6 +127,10 @@ protected Job.Builder createJob(String id) { } protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { + return createJob(id, modelMemoryLimit, false); + } + + protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit, boolean allowLazyOpen) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); dataDescription.setTimeFormat(DataDescription.EPOCH_MS); @@ -141,6 +145,7 @@ protected Job.Builder createJob(String id, ByteSizeValue modelMemoryLimit) { } builder.setAnalysisConfig(analysisConfig); builder.setDataDescription(dataDescription); + builder.setAllowLazyOpen(allowLazyOpen); return builder; }