From 22fc7a5787d228ba68222f47405b96dca40e2d63 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 3 Aug 2021 17:22:06 +0100 Subject: [PATCH] [ML] Deleting a job now deletes the datafeed if necessary Previously attempting to delete a job that had a datafeed would return an exception. However, this was unnecessarily pedantic - the user would always want to delete both job and datafeed together, and would react by deleting the datafeed and then subsequently deleting the job again. This change makes the delete job API automatically delete a datafeed associated with the job. The same level of force is used for this delete datafeed request as was used on the delete job request. This means that it's possible to force-delete an open job with a started datafeed (since force-delete datafeed will automatically stop a started datafeed). It's still not possible to delete an opened job without using force. Backport of #76010 --- .../apis/delete-job.asciidoc | 6 +- .../ml/action/TransportDeleteJobAction.java | 78 ++++++++++++------- .../test/ml/delete_job_force.yml | 69 +++++++++++++++- .../rest-api-spec/test/ml/jobs_crud.yml | 2 +- 4 files changed, 124 insertions(+), 31 deletions(-) diff --git a/docs/reference/ml/anomaly-detection/apis/delete-job.asciidoc b/docs/reference/ml/anomaly-detection/apis/delete-job.asciidoc index f157dd7fd9967..8d8c11f4dea32 100644 --- a/docs/reference/ml/anomaly-detection/apis/delete-job.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/delete-job.asciidoc @@ -18,8 +18,6 @@ Deletes an existing {anomaly-job}. * Requires the `manage_ml` cluster privilege. This privilege is included in the `machine_learning_admin` built-in role. -* Before you can delete a job, you must delete the {dfeeds} that are associated -with it. See <>. * Before you can delete a job, you must close it (unless you specify the `force` parameter). See <>. @@ -36,6 +34,10 @@ are granted to anyone over the `.ml-*` indices. It is not currently possible to delete multiple jobs using wildcards or a comma separated list. +If you delete a job that has a {dfeed}, the request will first attempt to +delete the {dfeed}, as though <> was called with the same +`timeout` and `force` parameters as this delete request. + [[ml-delete-job-path-parms]] == {api-path-parms-title} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 7a1f9b8906817..7c1f35ce331a8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -33,7 +33,9 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.ml.MlTasks; +import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.action.KillProcessAction; import org.elasticsearch.xpack.core.ml.action.PutJobAction; @@ -167,11 +169,20 @@ protected void masterOperation(Task task, DeleteJobAction.Request request, Clust }, finalListener::onFailure); - ActionListener jobExistsListener = ActionListener.wrap( + ActionListener datafeedDeleteListener = ActionListener.wrap( response -> { auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId)); - markJobAsDeletingIfNotUsed(request.getJobId(), taskId, markAsDeletingListener); + cancelResetTaskIfExists(request.getJobId(), ActionListener.wrap( + r -> jobConfigProvider.updateJobBlockReason(request.getJobId(), new Blocked(Blocked.Reason.DELETE, taskId), + markAsDeletingListener), + finalListener::onFailure + )); }, + finalListener::onFailure + ); + + ActionListener jobExistsListener = ActionListener.wrap( + response -> deleteDatafeedIfNecessary(request, datafeedDeleteListener), e -> { if (request.isForce() && MlTasks.getJobTask(request.getJobId(), state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE)) != null) { @@ -228,15 +239,9 @@ private void forceDeleteJob( logger.debug(() -> new ParameterizedMessage("[{}] force deleting job", jobId)); // 3. Delete the job - ActionListener removeTaskListener = new ActionListener() { - @Override - public void onResponse(Boolean response) { - // use clusterService.state() here so that the updated state without the task is available - normalDeleteJob(parentTaskClient, request, clusterService.state(), listener); - } - - @Override - public void onFailure(Exception e) { + ActionListener removeTaskListener = ActionListener.wrap( + response -> normalDeleteJob(parentTaskClient, request, clusterService.state(), listener), + e -> { if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { // use clusterService.state() here so that the updated state without the task is available normalDeleteJob(parentTaskClient, request, clusterService.state(), listener); @@ -244,7 +249,7 @@ public void onFailure(Exception e) { listener.onFailure(e); } } - }; + ); // 2. Cancel the persistent task. This closes the process gracefully so // the process should be killed first. @@ -293,21 +298,42 @@ private void checkJobIsNotOpen(String jobId, ClusterState state) { } } - private void markJobAsDeletingIfNotUsed(String jobId, TaskId taskId, ActionListener listener) { + private void deleteDatafeedIfNecessary(DeleteJobAction.Request deleteJobRequest, ActionListener listener) { - datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(jobId), ActionListener.wrap( - datafeedIds -> { - if (datafeedIds.isEmpty() == false) { - listener.onFailure(ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because datafeed [" - + datafeedIds.iterator().next() + "] refers to it")); - return; - } - cancelResetTaskIfExists(jobId, ActionListener.wrap( - response -> jobConfigProvider.updateJobBlockReason(jobId, new Blocked(Blocked.Reason.DELETE, taskId), listener), - listener::onFailure - )); - }, - listener::onFailure + datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singletonList(deleteJobRequest.getJobId()), ActionListener.wrap( + datafeedIds -> { + // Since it's only possible to delete a single job at a time there should not be more than one datafeed + assert datafeedIds.size() <= 1 : "Expected at most 1 datafeed for a single job, got " + datafeedIds; + if (datafeedIds.isEmpty()) { + listener.onResponse(AcknowledgedResponse.TRUE); + return; + } + DeleteDatafeedAction.Request deleteDatafeedRequest = new DeleteDatafeedAction.Request(datafeedIds.iterator().next()); + deleteDatafeedRequest.setForce(deleteJobRequest.isForce()); + deleteDatafeedRequest.timeout(deleteJobRequest.timeout()); + ClientHelper.executeAsyncWithOrigin( + client, + ClientHelper.ML_ORIGIN, + DeleteDatafeedAction.INSTANCE, + deleteDatafeedRequest, + ActionListener.wrap( + listener::onResponse, + e -> { + // It's possible that a simultaneous call to delete the datafeed has deleted it in between + // us finding the datafeed ID and trying to delete it in this method - this is OK + if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) { + listener.onResponse(AcknowledgedResponse.TRUE); + } else { + listener.onFailure(ExceptionsHelper.conflictStatusException( + "failed to delete job [{}] as its datafeed [{}] could not be deleted", e, + deleteJobRequest.getJobId(), deleteDatafeedRequest.getDatafeedId()) + ); + } + } + ) + ); + }, + listener::onFailure )); } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/delete_job_force.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/delete_job_force.yml index 8e12056c27fed..6a79631a72408 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/delete_job_force.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/delete_job_force.yml @@ -16,6 +16,23 @@ setup: } } + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + indices.create: + index: airline-data + body: + mappings: + properties: + time: + type: date + airline: + type: keyword + airport: + type: text + responsetime: + type: float + --- "Test force delete a closed job": - do: @@ -65,11 +82,59 @@ setup: body: > { "job_id":"force-delete-job", - "indexes":["index-foo"] + "indices":["index-foo"] } - match: { datafeed_id: force-delete-job-datafeed } - do: - catch: /Cannot delete job \[force-delete-job\] because datafeed \[force-delete-job-datafeed\] refers to it/ ml.delete_job: job_id: force-delete-job + - match: { acknowledged: true } + + - do: + ml.get_jobs: + job_id: "_all" + - match: { count: 0 } + + - do: + ml.get_datafeeds: + datafeed_id: "_all" + - match: { count: 0 } + +--- +"Test force delete an open job that is referred by a started datafeed": + + - do: + ml.open_job: + job_id: force-delete-job + + - do: + ml.put_datafeed: + datafeed_id: force-delete-job-started-datafeed + body: > + { + "job_id":"force-delete-job", + "indices":["airline-data"] + } + - match: { datafeed_id: force-delete-job-started-datafeed } + + - do: + ml.start_datafeed: + datafeed_id: force-delete-job-started-datafeed + start: 0 + + - do: + ml.delete_job: + force: true + job_id: force-delete-job + - match: { acknowledged: true } + + - do: + ml.get_jobs: + job_id: "_all" + - match: { count: 0 } + + - do: + ml.get_datafeeds: + datafeed_id: "_all" + - match: { count: 0 } diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml index 6bcbb8322a221..9658d621c6b93 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/jobs_crud.yml @@ -623,9 +623,9 @@ - match: { datafeed_id: "jobs-crud-test-datafeed-1" } - do: - catch: /Cannot delete job \[jobs-crud-datafeed-job\] because datafeed \[jobs-crud-test-datafeed-1\] refers to it/ ml.delete_job: job_id: jobs-crud-datafeed-job + - match: { acknowledged: true } --- "Test delete job that is opened":