From fbabd81e3d5cf74add6157bd3d9c0391b967d60f Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 19 Feb 2019 13:47:47 +0000 Subject: [PATCH] [ML] Allow stop unassigned datafeed and relax unset upgrade mode wait (#39034) These two changes are interlinked. Before this change unsetting ML upgrade mode would wait for all datafeeds to be assigned and not waiting for their corresponding jobs to initialise. However, this could be inappropriate, if there was a reason other that upgrade mode why one job was unable to be assigned or slow to start up. Unsetting of upgrade mode would hang in this case. This change relaxes the condition for considering upgrade mode to be unset to simply that an assignment attempt has been made for each ML persistent task that did not fail because upgrade mode was enabled. Thus after unsetting upgrade mode there is no guarantee that every ML persistent task is assigned, just that each is not unassigned due to upgrade mode. In order to make setting upgrade mode work immediately after unsetting upgrade mode it was then also necessary to make it possible to stop a datafeed that was not assigned. There was no particularly good reason why this was not allowed in the past. It is trivial to stop an unassigned datafeed because it just involves removing the persistent task. --- .../action/TransportSetUpgradeModeAction.java | 8 +--- .../action/TransportStopDatafeedAction.java | 41 +++++++++++-------- .../integration/MlDistributedFailureIT.java | 13 ++---- .../test/ml/set_upgrade_mode.yml | 12 +++--- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java index d16f9e18421d..58ff31a6bc84 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportSetUpgradeModeAction.java @@ -197,13 +197,9 @@ protected void masterOperation(SetUpgradeModeAction.Request request, ClusterStat (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty() && - // Datafeeds to wait for a non-"Awaiting upgrade" assignment and for the job task allocations to converge - // If we do not wait, deleting datafeeds, or attempting to unallocate them again causes issues as the - // job's task allocationId could have changed during either process. + // Wait for datafeeds to not be "Awaiting upgrade" persistentTasksCustomMetaData.findTasks(DATAFEED_TASK_NAME, - (t) -> - t.getAssignment().equals(AWAITING_UPGRADE) || - t.getAssignment().getExplanation().contains("state is stale")) + (t) -> t.getAssignment().equals(AWAITING_UPGRADE)) .isEmpty(), request.timeout(), ActionListener.wrap(r -> wrappedListener.onResponse(new AcknowledgedResponse(true)), wrappedListener::onFailure) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java index 636138a855bc..cbd55bb60d89 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStopDatafeedAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState; -import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; @@ -104,7 +103,7 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster() == false) { // Delegates stop datafeed to elected master node, so it becomes the coordinating node. - // See comment in StartDatafeedAction.Transport class for more information. + // See comment in TransportStartDatafeedAction for more information. if (nodes.getMasterNode() == null) { listener.onFailure(new MasterNotDiscoveredException("no known master node")); } else { @@ -142,13 +141,21 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A Set executorNodes = new HashSet<>(); for (String datafeedId : startedDatafeeds) { PersistentTasksCustomMetaData.PersistentTask datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks); - if (datafeedTask == null || datafeedTask.isAssigned() == false) { - String message = "Cannot stop datafeed [" + datafeedId + "] because the datafeed does not have an assigned node." + - " Use force stop to stop the datafeed"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { + if (datafeedTask == null) { + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found."; + assert datafeedTask != null : msg; + logger.error(msg); + } else if (datafeedTask.isAssigned()) { executorNodes.add(datafeedTask.getExecutorNode()); + } else { + // This is the easy case - the datafeed is not currently assigned to a node, + // so can be gracefully stopped simply by removing its persistent task. (Usually + // a graceful stop cannot be achieved by simply removing the persistent task, but + // if the datafeed has no running code then graceful/forceful are the same.) + // The listener here can be a no-op, as waitForDatafeedStopped() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(datafeedTask.getId(), ActionListener.wrap(r -> {}, e -> {})); } } @@ -198,9 +205,10 @@ public void onFailure(Exception e) { } }); } else { - String msg = "Requested datafeed [" + request.getDatafeedId() + "] be force-stopped, but " + - "datafeed's task could not be found."; - logger.warn(msg); + // This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method + String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found."; + assert datafeedTask != null : msg; + logger.error(msg); final int slot = counter.incrementAndGet(); failures.set(slot - 1, new RuntimeException(msg)); if (slot == startedDatafeeds.size()) { @@ -248,19 +256,18 @@ protected void doRun() throws Exception { private void sendResponseOrFailure(String datafeedId, ActionListener listener, AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.size() == 0) { listener.onResponse(new StopDatafeedAction.Response(true)); return; } - String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + catchedExceptions.size() + String msg = "Failed to stop datafeed [" + datafeedId + "] with [" + caughtExceptions.size() + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + + caughtExceptions.stream().map(Exception::getMessage).collect(Collectors.joining(", ")) + "]"; - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); listener.onFailure(e); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index bfe90f1a0316..fd402f6d2183 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -159,22 +159,15 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { client().execute(GetDatafeedsStatsAction.INSTANCE, datafeedStatsRequest).actionGet(); assertEquals(DatafeedState.STARTED, datafeedStatsResponse.getResponse().results().get(0).getDatafeedState()); - // Can't normal stop an unassigned datafeed + // An unassigned datafeed can be stopped either normally or by force StopDatafeedAction.Request stopDatafeedRequest = new StopDatafeedAction.Request(datafeedId); - ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, - () -> client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet()); - assertEquals("Cannot stop datafeed [" + datafeedId + - "] because the datafeed does not have an assigned node. Use force stop to stop the datafeed", - statusException.getMessage()); - - // Can only force stop an unassigned datafeed - stopDatafeedRequest.setForce(true); + stopDatafeedRequest.setForce(randomBoolean()); StopDatafeedAction.Response stopDatafeedResponse = client().execute(StopDatafeedAction.INSTANCE, stopDatafeedRequest).actionGet(); assertTrue(stopDatafeedResponse.isStopped()); // Can't normal stop an unassigned job CloseJobAction.Request closeJobRequest = new CloseJobAction.Request(jobId); - statusException = expectThrows(ElasticsearchStatusException.class, + ElasticsearchStatusException statusException = expectThrows(ElasticsearchStatusException.class, () -> client().execute(CloseJobAction.INSTANCE, closeJobRequest).actionGet()); assertEquals("Cannot close job [" + jobId + "] because the job does not have an assigned node. Use force close to close the job", diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml index 9b33af5f48bb..4a93e46c6b49 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/set_upgrade_mode.yml @@ -206,7 +206,11 @@ teardown: ml.get_datafeed_stats: datafeed_id: set-upgrade-mode-job-datafeed - match: { datafeeds.0.state: "started" } - - match: { datafeeds.0.assignment_explanation: "" } + # The datafeed will not be assigned until the job has updated its status on the node it's assigned + # to, and that probably won't happen in time for this assertion. That is indicated by an assignment + # reason ending "state is stale". However, the datafeed should NOT be unassigned with a reason of + # "upgrade mode is enabled" - that reason should have gone away before this test. + - match: { datafeeds.0.assignment_explanation: /(^$|.+job.+state.is.stale)/ } - do: cat.tasks: {} @@ -214,12 +218,6 @@ teardown: $body: | /.+job.+/ - - do: - cat.tasks: {} - - match: - $body: | - /.+datafeed.+/ - --- "Attempt to open job when upgrade_mode is enabled": - do: