From 416eaac92dcc40d29a3679e37057f9970911aa74 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 9 Oct 2019 14:49:46 +0100 Subject: [PATCH] Allow graceful close of unassigned jobs This change is similar to the one that was made to allow stopping unassigned datafeeds in #39034 because if unassigned jobs are now going to be the norm rather than the exception and remain unassigned for long periods then it's very frustrating to not be allowed to easily close them. --- .../ml/action/TransportCloseJobAction.java | 53 +++++++++++-------- .../action/TransportOpenJobActionTests.java | 1 - .../rest-api-spec/test/ml/jobs_crud.yml | 42 +++++++++++++-- 3 files changed, 67 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java index 9ed0210aa9a9a..1e4943d999266 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportCloseJobAction.java @@ -116,30 +116,39 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen return; } - if (request.isForce() == false) { + if (request.isForce()) { + List jobIdsToForceClose = new ArrayList<>(response.openJobIds); + jobIdsToForceClose.addAll(response.closingJobIds); + forceCloseJob(state, request, jobIdsToForceClose, listener); + } else { Set executorNodes = new HashSet<>(); PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); for (String resolvedJobId : request.getOpenJobIds()) { PersistentTasksCustomMetaData.PersistentTask jobTask = - MlTasks.getJobTask(resolvedJobId, tasks); - - if (jobTask == null || jobTask.isAssigned() == false) { - String message = "Cannot close job [" + resolvedJobId + "] because the job does not have " - + "an assigned node. Use force close to close the job"; - listener.onFailure(ExceptionsHelper.conflictStatusException(message)); - return; - } else { + MlTasks.getJobTask(resolvedJobId, tasks); + + if (jobTask == null) { + // This should not happen, because openJobIds was + // derived from the same tasks metadata as jobTask + String msg = "Requested job [" + resolvedJobId + + "] be stopped, but job's task could not be found."; + assert jobTask != null : msg; + logger.error(msg); + } else if (jobTask.isAssigned()) { executorNodes.add(jobTask.getExecutorNode()); + } else { + // This is the easy case - the job is not currently assigned to a node, so can + // be gracefully stopped simply by removing its persistent task. (Usually a + // graceful stop cannot be achieved by simply removing the persistent task, but + // if the job has no running code then graceful/forceful are basically the same.) + // The listener here can be a no-op, as waitForJobClosed() already waits for + // these persistent tasks to disappear. + persistentTasksService.sendRemoveRequest(jobTask.getId(), + ActionListener.wrap(r -> {}, e -> {})); } } - request.setNodes(executorNodes.toArray(new String[executorNodes.size()])); - } + request.setNodes(executorNodes.toArray(new String[0])); - if (request.isForce()) { - List jobIdsToForceClose = new ArrayList<>(response.openJobIds); - jobIdsToForceClose.addAll(response.closingJobIds); - forceCloseJob(state, request, jobIdsToForceClose, listener); - } else { normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener); } }, @@ -148,7 +157,6 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen }, listener::onFailure )); - } } @@ -353,21 +361,20 @@ public void onFailure(Exception e) { private void sendResponseOrFailure(String jobId, ActionListener listener, AtomicArray failures) { - List catchedExceptions = failures.asList(); - if (catchedExceptions.size() == 0) { + List caughtExceptions = failures.asList(); + if (caughtExceptions.size() == 0) { listener.onResponse(new CloseJobAction.Response(true)); return; } String msg = "Failed to force close job [" + jobId + "] with [" - + catchedExceptions.size() + + caughtExceptions.size() + "] failures, rethrowing last, all Exceptions: [" - + catchedExceptions.stream().map(Exception::getMessage) + + caughtExceptions.stream().map(Exception::getMessage) .collect(Collectors.joining(", ")) + "]"; - ElasticsearchException e = new ElasticsearchException(msg, - catchedExceptions.get(0)); + ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0)); listener.onFailure(e); } }); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 2e9c16ece220b..2809d7e57efc1 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -60,7 +60,6 @@ import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder; import static org.hamcrest.Matchers.is; -import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml index eb6186c3a64d6..9f32ff22dedf2 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -113,8 +113,9 @@ - match: { analysis_limits.model_memory_limit: "2048mb" } --- -"Test put job with model_memory_limit as string": - +"Test put job with model_memory_limit as string and lazy open": + - skip: + features: headers - do: ml.put_job: job_id: job-model-memory-limit-as-string @@ -126,11 +127,42 @@ "data_description" : { }, "analysis_limits": { - "model_memory_limit": "3g" - } + "model_memory_limit": "3000g" + }, + "allow_lazy_open": true } - match: { job_id: "job-model-memory-limit-as-string" } - - match: { analysis_limits.model_memory_limit: "3072mb" } + - match: { analysis_limits.model_memory_limit: "3072000mb" } + + # The assumption here is that a 3000GB job will not fit on the test + # node - increase in future if the test ever fails because of this! + # But because the job is allowed to open lazily, opening it shouldn't + # throw an exception - it should wait for a big enough node to be + # added to the cluster. + - do: + ml.open_job: + job_id: job-model-memory-limit-as-string + - match: { opened: false } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.get_job_stats: + job_id: job-model-memory-limit-as-string + - match: {"jobs.0.state": opening} + + # Despite never being assigned to a node the job should close gracefully + - do: + ml.close_job: + job_id: job-model-memory-limit-as-string + - match: { closed: true } + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.get_job_stats: + job_id: job-model-memory-limit-as-string + - match: {"jobs.0.state": closed} --- "Test get job API with non existing job id":