Skip to content

Commit

Permalink
Allow graceful close of unassigned jobs
Browse files Browse the repository at this point in the history
This change is similar to the one that was made
to allow stopping unassigned datafeeds in elastic#39034
because if unassigned jobs are now going to be
the norm rather than the exception and remain
unassigned for long periods then it's very
frustrating to not be allowed to easily close
them.
  • Loading branch information
droberts195 committed Oct 9, 2019
1 parent adb111a commit 416eaac
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,39 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
return;
}

if (request.isForce() == false) {
if (request.isForce()) {
List<String> jobIdsToForceClose = new ArrayList<>(response.openJobIds);
jobIdsToForceClose.addAll(response.closingJobIds);
forceCloseJob(state, request, jobIdsToForceClose, listener);
} else {
Set<String> executorNodes = new HashSet<>();
PersistentTasksCustomMetaData tasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
for (String resolvedJobId : request.getOpenJobIds()) {
PersistentTasksCustomMetaData.PersistentTask<?> jobTask =
MlTasks.getJobTask(resolvedJobId, tasks);

if (jobTask == null || jobTask.isAssigned() == false) {
String message = "Cannot close job [" + resolvedJobId + "] because the job does not have "
+ "an assigned node. Use force close to close the job";
listener.onFailure(ExceptionsHelper.conflictStatusException(message));
return;
} else {
MlTasks.getJobTask(resolvedJobId, tasks);

if (jobTask == null) {
// This should not happen, because openJobIds was
// derived from the same tasks metadata as jobTask
String msg = "Requested job [" + resolvedJobId
+ "] be stopped, but job's task could not be found.";
assert jobTask != null : msg;
logger.error(msg);
} else if (jobTask.isAssigned()) {
executorNodes.add(jobTask.getExecutorNode());
} else {
// This is the easy case - the job is not currently assigned to a node, so can
// be gracefully stopped simply by removing its persistent task. (Usually a
// graceful stop cannot be achieved by simply removing the persistent task, but
// if the job has no running code then graceful/forceful are basically the same.)
// The listener here can be a no-op, as waitForJobClosed() already waits for
// these persistent tasks to disappear.
persistentTasksService.sendRemoveRequest(jobTask.getId(),
ActionListener.wrap(r -> {}, e -> {}));
}
}
request.setNodes(executorNodes.toArray(new String[executorNodes.size()]));
}
request.setNodes(executorNodes.toArray(new String[0]));

if (request.isForce()) {
List<String> jobIdsToForceClose = new ArrayList<>(response.openJobIds);
jobIdsToForceClose.addAll(response.closingJobIds);
forceCloseJob(state, request, jobIdsToForceClose, listener);
} else {
normalCloseJob(state, task, request, response.openJobIds, response.closingJobIds, listener);
}
},
Expand All @@ -148,7 +157,6 @@ protected void doExecute(Task task, CloseJobAction.Request request, ActionListen
},
listener::onFailure
));

}
}

Expand Down Expand Up @@ -353,21 +361,20 @@ public void onFailure(Exception e) {
private void sendResponseOrFailure(String jobId,
ActionListener<CloseJobAction.Response> listener,
AtomicArray<Exception> failures) {
List<Exception> catchedExceptions = failures.asList();
if (catchedExceptions.size() == 0) {
List<Exception> caughtExceptions = failures.asList();
if (caughtExceptions.size() == 0) {
listener.onResponse(new CloseJobAction.Response(true));
return;
}

String msg = "Failed to force close job [" + jobId + "] with ["
+ catchedExceptions.size()
+ caughtExceptions.size()
+ "] failures, rethrowing last, all Exceptions: ["
+ catchedExceptions.stream().map(Exception::getMessage)
+ caughtExceptions.stream().map(Exception::getMessage)
.collect(Collectors.joining(", "))
+ "]";

ElasticsearchException e = new ElasticsearchException(msg,
catchedExceptions.get(0));
ElasticsearchException e = new ElasticsearchException(msg, caughtExceptions.get(0));
listener.onFailure(e);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@

import static org.elasticsearch.xpack.core.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.is;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@
- match: { analysis_limits.model_memory_limit: "2048mb" }

---
"Test put job with model_memory_limit as string":

"Test put job with model_memory_limit as string and lazy open":
- skip:
features: headers
- do:
ml.put_job:
job_id: job-model-memory-limit-as-string
Expand All @@ -126,11 +127,42 @@
"data_description" : {
},
"analysis_limits": {
"model_memory_limit": "3g"
}
"model_memory_limit": "3000g"
},
"allow_lazy_open": true
}
- match: { job_id: "job-model-memory-limit-as-string" }
- match: { analysis_limits.model_memory_limit: "3072mb" }
- match: { analysis_limits.model_memory_limit: "3072000mb" }

# The assumption here is that a 3000GB job will not fit on the test
# node - increase in future if the test ever fails because of this!
# But because the job is allowed to open lazily, opening it shouldn't
# throw an exception - it should wait for a big enough node to be
# added to the cluster.
- do:
ml.open_job:
job_id: job-model-memory-limit-as-string
- match: { opened: false }

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.get_job_stats:
job_id: job-model-memory-limit-as-string
- match: {"jobs.0.state": opening}

# Despite never being assigned to a node the job should close gracefully
- do:
ml.close_job:
job_id: job-model-memory-limit-as-string
- match: { closed: true }

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.get_job_stats:
job_id: job-model-memory-limit-as-string
- match: {"jobs.0.state": closed}

---
"Test get job API with non existing job id":
Expand Down

0 comments on commit 416eaac

Please sign in to comment.