Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Deleting a job now deletes the datafeed if necessary #76064

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/reference/ml/anomaly-detection/apis/delete-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ Deletes an existing {anomaly-job}.

* Requires the `manage_ml` cluster privilege. This privilege is included in the
`machine_learning_admin` built-in role.
* Before you can delete a job, you must delete the {dfeeds} that are associated
with it. See <<ml-delete-datafeed>>.
* Before you can delete a job, you must close it (unless you specify the `force`
parameter). See <<ml-close-job>>.

Expand All @@ -36,6 +34,10 @@ are granted to anyone over the `.ml-*` indices.
It is not currently possible to delete multiple jobs using wildcards or a comma
separated list.

If you delete a job that has a {dfeed}, the request will first attempt to
delete the {dfeed}, as though <<ml-delete-datafeed>> was called with the same
`timeout` and `force` parameters as this delete request.

[[ml-delete-job-path-parms]]
== {api-path-parms-title}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
Expand Down Expand Up @@ -167,11 +169,20 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust
},
finalListener::onFailure);

ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
ActionListener<AcknowledgedResponse> datafeedDeleteListener = ActionListener.wrap(
response -> {
auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId));
markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener);
cancelResetTaskIfExists(request.getJobId(), ActionListener.wrap(
r -> jobConfigProvider.updateJobBlockReason(request.getJobId(), new Blocked(Blocked.Reason.DELETE, taskId),
markAsDeletingListener),
finalListener::onFailure
));
},
finalListener::onFailure
);

ActionListener<Boolean> jobExistsListener = ActionListener.wrap(
response -> deleteDatafeedIfNecessary(request, datafeedDeleteListener),
e -> {
if (request.isForce()
&& MlTasks.getJobTask(request.getJobId(), state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE)) != null) {
Expand Down Expand Up @@ -228,23 +239,17 @@ private void forceDeleteJob(
logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId));

// 3. Delete the job
ActionListener<Boolean> removeTaskListener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
// use clusterService.state() here so that the updated state without the task is available
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
}

@Override
public void onFailure(Exception e) {
ActionListener<Boolean> removeTaskListener = ActionListener.wrap(
response -> normalDeleteJob(parentTaskClient, request, clusterService.state(), listener),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
// use clusterService.state() here so that the updated state without the task is available
normalDeleteJob(parentTaskClient, request, clusterService.state(), listener);
} else {
listener.onFailure(e);
}
}
};
);

// 2. Cancel the persistent task. This closes the process gracefully so
// the process should be killed first.
Expand Down Expand Up @@ -293,21 +298,42 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) {
}
}

private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener<PutJobAction.Response> listener) {
private void deleteDatafeedIfNecessary(DeleteJobAction.Request deleteJobRequest, ActionListener<AcknowledgedResponse> listener) {

datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty() == false) {
listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed ["
+ datafeedIds.iterator().next() + "] refers to it"));
return;
}
cancelResetTaskIfExists(jobId, ActionListener.wrap(
response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener),
listener::onFailure
));
},
listener::onFailure
datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(deleteJobRequest.getJobId()), ActionListener.wrap(
datafeedIds -> {
// Since it's only possible to delete a single job at a time there should not be more than one datafeed
assert datafeedIds.size() <= 1 : "Expected at most 1 datafeed for a single job, got " + datafeedIds;
if (datafeedIds.isEmpty()) {
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(datafeedIds.iterator().next());
deleteDatafeedRequest.setForce(deleteJobRequest.isForce());
deleteDatafeedRequest.timeout(deleteJobRequest.timeout());
ClientHelper.executeAsyncWithOrigin(
client,
ClientHelper.ML_ORIGIN,
DeleteDatafeedAction.INSTANCE,
deleteDatafeedRequest,
ActionListener.wrap(
listener::onResponse,
e -> {
// It's possible that a simultaneous call to delete the datafeed has deleted it in between
// us finding the datafeed ID and trying to delete it in this method - this is OK
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) {
listener.onResponse(AcknowledgedResponse.TRUE);
} else {
listener.onFailure(ExceptionsHelper.conflictStatusException(
"failed to delete job [{}] as its datafeed [{}] could not be deleted", e,
deleteJobRequest.getJobId(), deleteDatafeedRequest.getDatafeedId())
);
}
}
)
);
},
listener::onFailure
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@ setup:
}
}

- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.create:
index: airline-data
body:
mappings:
properties:
time:
type: date
airline:
type: keyword
airport:
type: text
responsetime:
type: float

---
"Test force delete a closed job":
- do:
Expand Down Expand Up @@ -65,11 +82,59 @@ setup:
body: >
{
"job_id":"force-delete-job",
"indexes":["index-foo"]
"indices":["index-foo"]
}
- match: { datafeed_id: force-delete-job-datafeed }

- do:
catch: /Cannot delete job \[force-delete-job\] because datafeed \[force-delete-job-datafeed\] refers to it/
ml.delete_job:
job_id: force-delete-job
- match: { acknowledged: true }

- do:
ml.get_jobs:
job_id: "_all"
- match: { count: 0 }

- do:
ml.get_datafeeds:
datafeed_id: "_all"
- match: { count: 0 }

---
"Test force delete an open job that is referred by a started datafeed":

- do:
ml.open_job:
job_id: force-delete-job

- do:
ml.put_datafeed:
datafeed_id: force-delete-job-started-datafeed
body: >
{
"job_id":"force-delete-job",
"indices":["airline-data"]
}
- match: { datafeed_id: force-delete-job-started-datafeed }

- do:
ml.start_datafeed:
datafeed_id: force-delete-job-started-datafeed
start: 0

- do:
ml.delete_job:
force: true
job_id: force-delete-job
- match: { acknowledged: true }

- do:
ml.get_jobs:
job_id: "_all"
- match: { count: 0 }

- do:
ml.get_datafeeds:
datafeed_id: "_all"
- match: { count: 0 }
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,9 @@
- match: { datafeed_id: "jobs-crud-test-datafeed-1" }

- do:
catch: /Cannot delete job \[jobs-crud-datafeed-job\] because datafeed \[jobs-crud-test-datafeed-1\] refers to it/
ml.delete_job:
job_id: jobs-crud-datafeed-job
- match: { acknowledged: true }

---
"Test delete job that is opened":
Expand Down