diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index d16f9e18421d8..58ff31a6bc847 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -197,13 +197,9 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty() && - // Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge - // If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the - // job's task allocationId could have changed during either process. + // Wait for datafeeds to not be "Awaiting upgrade" persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME, - (t) -> - t.getAssignment().equals(AWAITING_UPGRADE) || - t.getAssignment().getExplanation().contains("state is stale")) + (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty(), request.timeout(), ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 636138a855bce..cbd55bb60d896 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -104,7 +103,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster() == false) { // Delegates stop datafeed to elected master node, so it becomes the coordinating node. - // See comment in StartDatafeedAction.Transport class for more information. + // See comment in TransportStartDatafeedAction for more information. if (nodes.getMasterNode() == null) { listener.onFailure(new MasterNotDiscoveredException("no known master node")); } else { @@ -142,13 +141,21 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A Set executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); - if (datafeedTask == null || datafeedTask.isAssigned() == false) { - String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." + - " Use force stop to stop the datafeed"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { + if (datafeedTask == null) { + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; + assert datafeedTask != null : msg; + logger.error(msg); + } else if (datafeedTask.isAssigned()) { executorNodes.add(datafeedTask.getExecutorNode()); + } else { + // This is the easy case - the datafeed is not currently assigned to a node, + // so can be gracefully stopped simply by removing its persistent task. (Usually + // a graceful stop cannot be achieved by simply removing the persistent task, but + // if the datafeed has no running code then graceful/forceful are the same.) + // The listener here can be a no-op, as waitForDatafeedStopped() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {})); } } @@ -198,9 +205,10 @@ public void onFailure(Exception e) { } }); } else { - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + - "datafeed's task could not be found."; - logger.warn(msg); + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found."; + assert datafeedTask != null : msg; + logger.error(msg); final int slot = counter.incrementAndGet(); failures.set(slot - 1, new RuntimeException(msg)); if (slot == startedDatafeeds.size()) { @@ -248,19 +256,18 @@ protected void doRun() throws Exception { private void sendResponseOrFailure(String datafeedId, ActionListener listener, AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.size() == 0) { listener.onResponse(new StopDatafeedAction.Response(true)); return; } - String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size() + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + "]"; - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index bfe90f1a03166..fd402f6d2183f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -159,22 +159,15 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); - // Can't normal stop an unassigned datafeed + // An unassigned datafeed can be stopped either normally or by force StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); - ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, - () -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet()); - assertEquals("Cannot stop datafeed [" + datafeedId + - "] because the datafeed does not have an assigned node. Use force stop to stop the datafeed", - statusException.getMessage()); - - // Can only force stop an unassigned datafeed - stopDatafeedRequest.setForce(true); + stopDatafeedRequest.setForce(randomBoolean()); StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); assertTrue(stopDatafeedResponse.isStopped()); // Can't normal stop an unassigned job CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); - statusException = expectThrows(ElasticsearchStatusException.class, + ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet()); assertEquals("Cannot close job [" + jobId + "] because the job does not have an assigned node. Use force close to close the job", diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index 9b33af5f48bb0..4a93e46c6b491 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -206,7 +206,11 @@ teardown: ml.get_datafeed_stats: datafeed_id: set-upgrade-mode-job-datafeed - match: { datafeeds.0.state: "started" } - - match: { datafeeds.0.assignment_explanation: "" } + # The datafeed will not be assigned until the job has updated its status on the node it's assigned + # to, and that probably won't happen in time for this assertion. That is indicated by an assignment + # reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of + # "upgrade mode is enabled" - that reason should have gone away before this test. + - match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ } - do: cat.tasks: {} @@ -214,12 +218,6 @@ teardown: $body: | /.+job.+/ - - do: - cat.tasks: {} - - match: - $body: | - /.+datafeed.+/ - --- "Attempt to open job when upgrade_mode is enabled": - do: