diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index 1db26087ae88c..e3570a2a837a6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -148,7 +148,12 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) { Request request = new Request(HttpDelete.METHOD_NAME, endpoint); RequestConverters.Params params = new RequestConverters.Params(request); - params.putParam("force", Boolean.toString(deleteJobRequest.isForce())); + if (deleteJobRequest.getForce() != null) { + params.putParam("force", Boolean.toString(deleteJobRequest.getForce())); + } + if (deleteJobRequest.getWaitForCompletion() != null) { + params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion())); + } return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java index 29250e5d440bd..8c442d8ffa646 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java @@ -26,6 +26,7 @@ import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -211,14 +212,15 @@ public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options, * * @param request The request to delete the job * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized - * @return action acknowledgement + * @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for + * completion * @throws IOException when there is a serialization issue sending the request or receiving the response */ - public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException { + public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException { return restHighLevelClient.performRequestAndParseEntity(request, MLRequestConverters::deleteJob, options, - AcknowledgedResponse::fromXContent, + DeleteJobResponse::fromXContent, Collections.emptySet()); } @@ -232,11 +234,11 @@ public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions o * @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized * @param listener Listener to be notified upon request completion */ - public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener listener) { + public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener listener) { restHighLevelClient.performRequestAsyncAndParseEntity(request, MLRequestConverters::deleteJob, options, - AcknowledgedResponse::fromXContent, + DeleteJobResponse::fromXContent, listener, Collections.emptySet()); } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java index a355f7ec659bb..44e3668059c47 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobRequest.java @@ -29,7 +29,8 @@ public class DeleteJobRequest extends ActionRequest { private String jobId; - private boolean force; + private Boolean force; + private Boolean waitForCompletion; public DeleteJobRequest(String jobId) { this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null"); @@ -47,7 +48,7 @@ public void setJobId(String jobId) { this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null"); } - public boolean isForce() { + public Boolean getForce() { return force; } @@ -57,10 +58,24 @@ public boolean isForce() { * * @param force When {@code true} forcefully delete an opened job. Defaults to {@code false} */ - public void setForce(boolean force) { + public void setForce(Boolean force) { this.force = force; } + public Boolean getWaitForCompletion() { + return waitForCompletion; + } + + /** + * Set whether this request should wait until the operation has completed before returning + * @param waitForCompletion When {@code true} the call will wait for the job deletion to complete. + * Otherwise, the deletion will be executed asynchronously and the response + * will contain the task id. + */ + public void setWaitForCompletion(Boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java new file mode 100644 index 0000000000000..f1487c8c2765b --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteJobResponse.java @@ -0,0 +1,113 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Objects; + +/** + * Response object that contains the acknowledgement or the task id + * depending on whether the delete job action was requested to wait for completion. + */ +public class DeleteJobResponse extends ActionResponse implements ToXContentObject { + + private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged"); + private static final ParseField TASK = new ParseField("task"); + + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("delete_job_response", + true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1])); + + static { + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING); + } + + public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + private final Boolean acknowledged; + private final TaskId task; + + DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) { + assert acknowledged != null || task != null; + this.acknowledged = acknowledged; + this.task = task; + } + + /** + * Get the action acknowledgement + * @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or + * otherwise a {@code boolean} that indicates whether the job was deleted successfully. + */ + public Boolean getAcknowledged() { + return acknowledged; + } + + /** + * Get the task id + * @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or + * otherwise the id of the job deletion task. + */ + public TaskId getTask() { + return task; + } + + @Override + public int hashCode() { + return Objects.hash(acknowledged, task); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DeleteJobResponse that = (DeleteJobResponse) other; + return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (acknowledged != null) { + builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged); + } + if (task != null) { + builder.field(TASK.getPreferredName(), task.toString()); + } + builder.endObject(); + return builder; + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java index f30a003e02a7d..13b4dcb955a05 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/job/config/Job.java @@ -65,6 +65,7 @@ public class Job implements ToXContentObject { public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days"); public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); + public static final ParseField DELETING = new ParseField("deleting"); public static final ObjectParser PARSER = new ObjectParser<>("job_details", true, Builder::new); @@ -94,6 +95,7 @@ public class Job implements ToXContentObject { PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT); PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); + PARSER.declareBoolean(Builder::setDeleting, DELETING); } private final String jobId; @@ -115,13 +117,14 @@ public class Job implements ToXContentObject { private final Map customSettings; private final String modelSnapshotId; private final String resultsIndexName; + private final Boolean deleting; private Job(String jobId, String jobType, List groups, String description, Date createTime, Date finishedTime, Long establishedModelMemory, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, String resultsIndexName) { + String modelSnapshotId, String resultsIndexName, Boolean deleting) { this.jobId = jobId; this.jobType = jobType; @@ -141,6 +144,7 @@ private Job(String jobId, String jobType, List groups, String descriptio this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings); this.modelSnapshotId = modelSnapshotId; this.resultsIndexName = resultsIndexName; + this.deleting = deleting; } /** @@ -275,6 +279,10 @@ public String getModelSnapshotId() { return modelSnapshotId; } + public Boolean getDeleting() { + return deleting; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -330,6 +338,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (resultsIndexName != null) { builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); } + if (deleting != null) { + builder.field(DELETING.getPreferredName(), deleting); + } builder.endObject(); return builder; } @@ -362,7 +373,8 @@ public boolean equals(Object other) { && Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays) && Objects.equals(this.customSettings, that.customSettings) && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) - && Objects.equals(this.resultsIndexName, that.resultsIndexName); + && Objects.equals(this.resultsIndexName, that.resultsIndexName) + && Objects.equals(this.deleting, that.deleting); } @Override @@ -370,7 +382,7 @@ public int hashCode() { return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName); + modelSnapshotId, resultsIndexName, deleting); } @Override @@ -402,6 +414,7 @@ public static class Builder { private Map customSettings; private String modelSnapshotId; private String resultsIndexName; + private Boolean deleting; private Builder() { } @@ -429,6 +442,7 @@ public Builder(Job job) { this.customSettings = job.getCustomSettings(); this.modelSnapshotId = job.getModelSnapshotId(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); + this.deleting = job.getDeleting(); } public Builder setId(String id) { @@ -525,6 +539,11 @@ public Builder setResultsIndexName(String resultsIndexName) { return this; } + Builder setDeleting(Boolean deleting) { + this.deleting = deleting; + return this; + } + /** * Builds a job. * @@ -537,7 +556,7 @@ public Job build() { id, jobType, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, resultsIndexName); + modelSnapshotId, resultsIndexName, deleting); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index b07f78cab1b81..8c5f49c943f33 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -164,11 +164,18 @@ public void testDeleteJob() { Request request = MLRequestConverters.deleteJob(deleteJobRequest); assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint()); - assertEquals(Boolean.toString(false), request.getParameters().get("force")); + assertNull(request.getParameters().get("force")); + assertNull(request.getParameters().get("wait_for_completion")); + deleteJobRequest = new DeleteJobRequest(jobId); deleteJobRequest.setForce(true); request = MLRequestConverters.deleteJob(deleteJobRequest); assertEquals(Boolean.toString(true), request.getParameters().get("force")); + + deleteJobRequest = new DeleteJobRequest(jobId); + deleteJobRequest.setWaitForCompletion(false); + request = MLRequestConverters.deleteJob(deleteJobRequest); + assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion")); } public void testFlushJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java index 5d3fc82a4bb1b..cac9f533501b5 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MachineLearningIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -151,17 +152,33 @@ public void testGetJob() throws Exception { assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2)); } - public void testDeleteJob() throws Exception { + public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception { String jobId = randomValidJobId(); Job job = buildJob(jobId); MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); - AcknowledgedResponse response = execute(new DeleteJobRequest(jobId), + DeleteJobResponse response = execute(new DeleteJobRequest(jobId), machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); - assertTrue(response.isAcknowledged()); + assertTrue(response.getAcknowledged()); + assertNull(response.getTask()); + } + + public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception { + String jobId = randomValidJobId(); + Job job = buildJob(jobId); + MachineLearningClient machineLearningClient = highLevelClient().machineLearning(); + machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT); + + DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId); + deleteJobRequest.setWaitForCompletion(false); + + DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync); + + assertNull(response.getAcknowledged()); + assertNotNull(response.getTask()); } public void testOpenJob() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index eb1d65a380565..0c0efb241f9ab 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -39,6 +39,7 @@ import org.elasticsearch.client.ml.DeleteDatafeedRequest; import org.elasticsearch.client.ml.DeleteForecastRequest; import org.elasticsearch.client.ml.DeleteJobRequest; +import org.elasticsearch.client.ml.DeleteJobResponse; import org.elasticsearch.client.ml.FlushJobRequest; import org.elasticsearch.client.ml.FlushJobResponse; import org.elasticsearch.client.ml.ForecastJobRequest; @@ -108,6 +109,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.TaskId; import org.junit.After; import java.io.IOException; @@ -281,20 +283,34 @@ public void testDeleteJob() throws Exception { { //tag::x-pack-delete-ml-job-request - DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job"); - deleteJobRequest.setForce(false); // <1> - AcknowledgedResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT); + DeleteJobRequest deleteJobRequest = new DeleteJobRequest("my-first-machine-learning-job"); // <1> //end::x-pack-delete-ml-job-request + //tag::x-pack-delete-ml-job-request-force + deleteJobRequest.setForce(false); // <1> + //end::x-pack-delete-ml-job-request-force + + //tag::x-pack-delete-ml-job-request-wait-for-completion + deleteJobRequest.setWaitForCompletion(true); // <1> + //end::x-pack-delete-ml-job-request-wait-for-completion + + //tag::x-pack-delete-ml-job-execute + DeleteJobResponse deleteJobResponse = client.machineLearning().deleteJob(deleteJobRequest, RequestOptions.DEFAULT); + //end::x-pack-delete-ml-job-execute + //tag::x-pack-delete-ml-job-response - boolean isAcknowledged = deleteJobResponse.isAcknowledged(); // <1> + Boolean isAcknowledged = deleteJobResponse.getAcknowledged(); // <1> + TaskId task = deleteJobResponse.getTask(); // <2> //end::x-pack-delete-ml-job-response + + assertTrue(isAcknowledged); + assertNull(task); } { //tag::x-pack-delete-ml-job-request-listener - ActionListener listener = new ActionListener() { + ActionListener listener = new ActionListener() { @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { + public void onResponse(DeleteJobResponse deleteJobResponse) { // <1> } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java index d3ccb98eeb68a..d9f96fd0f288c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobRequestTests.java @@ -34,12 +34,4 @@ public void test_WithNullJobId() { ex = expectThrows(NullPointerException.class, () -> createTestInstance().setJobId(null)); assertEquals("[job_id] must not be null", ex.getMessage()); } - - public void test_WithForce() { - DeleteJobRequest deleteJobRequest = createTestInstance(); - assertFalse(deleteJobRequest.isForce()); - - deleteJobRequest.setForce(true); - assertTrue(deleteJobRequest.isForce()); - } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java new file mode 100644 index 0000000000000..97a8c5b892c69 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteJobResponseTests.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class DeleteJobResponseTests extends AbstractXContentTestCase { + + @Override + protected DeleteJobResponse createTestInstance() { + if (randomBoolean()) { + return new DeleteJobResponse(randomBoolean(), null); + } + return new DeleteJobResponse(null, new TaskId(randomAlphaOfLength(20) + ":" + randomIntBetween(1, 100))); + } + + @Override + protected DeleteJobResponse doParseInstance(XContentParser parser) throws IOException { + return DeleteJobResponse.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java index b678dce6cffc9..667932d591231 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/job/config/JobTests.java @@ -34,9 +34,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.List; -import java.util.Map; public class JobTests extends AbstractXContentTestCase { @@ -77,93 +75,6 @@ public void testFutureMetadataParse() throws IOException { assertNotNull(Job.PARSER.apply(parser, null).build()); } - public void testEquals_GivenDifferentClass() { - Job job = buildJobBuilder("foo").build(); - assertFalse(job.equals("a string")); - } - - public void testEquals_GivenDifferentIds() { - Date createTime = new Date(); - Job.Builder builder = buildJobBuilder("foo"); - builder.setCreateTime(createTime); - Job job1 = builder.build(); - builder.setId("bar"); - Job job2 = builder.build(); - assertFalse(job1.equals(job2)); - } - - public void testEquals_GivenDifferentRenormalizationWindowDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setRenormalizationWindowDays(3L); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setRenormalizationWindowDays(4L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentBackgroundPersistInterval() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setBackgroundPersistInterval(TimeValue.timeValueSeconds(10000L)); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setBackgroundPersistInterval(TimeValue.timeValueSeconds(8000L)); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentModelSnapshotRetentionDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setModelSnapshotRetentionDays(10L); - jobDetails1.setCreateTime(date); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setModelSnapshotRetentionDays(8L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentResultsRetentionDays() { - Date date = new Date(); - Job.Builder jobDetails1 = new Job.Builder("foo"); - jobDetails1.setDataDescription(new DataDescription.Builder()); - jobDetails1.setAnalysisConfig(createAnalysisConfig()); - jobDetails1.setCreateTime(date); - jobDetails1.setResultsRetentionDays(30L); - Job.Builder jobDetails2 = new Job.Builder("foo"); - jobDetails2.setDataDescription(new DataDescription.Builder()); - jobDetails2.setResultsRetentionDays(4L); - jobDetails2.setAnalysisConfig(createAnalysisConfig()); - jobDetails2.setCreateTime(date); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - - public void testEquals_GivenDifferentCustomSettings() { - Job.Builder jobDetails1 = buildJobBuilder("foo"); - Map customSettings1 = new HashMap<>(); - customSettings1.put("key1", "value1"); - jobDetails1.setCustomSettings(customSettings1); - Job.Builder jobDetails2 = buildJobBuilder("foo"); - Map customSettings2 = new HashMap<>(); - customSettings2.put("key2", "value2"); - jobDetails2.setCustomSettings(customSettings2); - assertFalse(jobDetails1.build().equals(jobDetails2.build())); - } - public void testCopyConstructor() { for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { Job job = createTestInstance(); @@ -184,20 +95,6 @@ public void testBuilder_WithNullJobType() { assertEquals("[job_type] must not be null", ex.getMessage()); } - public static Job.Builder buildJobBuilder(String id, Date date) { - Job.Builder builder = new Job.Builder(id); - builder.setCreateTime(date); - AnalysisConfig.Builder ac = createAnalysisConfig(); - DataDescription.Builder dc = new DataDescription.Builder(); - builder.setAnalysisConfig(ac); - builder.setDataDescription(dc); - return builder; - } - - public static Job.Builder buildJobBuilder(String id) { - return buildJobBuilder(id, new Date()); - } - public static String randomValidJobId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); @@ -262,6 +159,9 @@ public static Job.Builder createRandomizedJobBuilder() { if (randomBoolean()) { builder.setResultsIndexName(randomValidJobId()); } + if (randomBoolean()) { + builder.setDeleting(randomBoolean()); + } return builder; } diff --git a/docs/java-rest/high-level/ml/delete-job.asciidoc b/docs/java-rest/high-level/ml/delete-job.asciidoc index 43f1e2fb02bbf..7cdc4149b231c 100644 --- a/docs/java-rest/high-level/ml/delete-job.asciidoc +++ b/docs/java-rest/high-level/ml/delete-job.asciidoc @@ -4,26 +4,57 @@ [[java-rest-high-x-pack-machine-learning-delete-job-request]] ==== Delete Job Request -A `DeleteJobRequest` object requires a non-null `jobId` and can optionally set `force`. -Can be executed as follows: +A `DeleteJobRequest` object requires a non-null `jobId`. ["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request] --------------------------------------------------- +<1> Constructing a new request referencing an existing `jobId` + +==== Optional Arguments + +The following arguments are optional: + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-force] +--------------------------------------------------- <1> Use to forcefully delete an opened job; this method is quicker than closing and deleting the job. -Defaults to `false` +Defaults to `false`. + +["source","java",subs="attributes,callouts,macros"] +--------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-request-wait-for-completion] +--------------------------------------------------- +<1> Use to set whether the request should wait until the operation has completed before returning. +Defaults to `true`. + +[[java-rest-high-x-pack-machine-learning-delete-job-execution]] +==== Execution + +The request can be executed through the `MachineLearningClient` contained +in the `RestHighLevelClient` object, accessed via the `machineLearningClient()` method. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-execute] +-------------------------------------------------- [[java-rest-high-x-pack-machine-learning-delete-job-response]] ==== Delete Job Response -The returned `AcknowledgedResponse` object indicates the acknowledgement of the request: +The returned `DeleteJobResponse` object contains the acknowledgement of the +job deletion or the deletion task depending on whether the request was set +to wait for completion: + ["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- include-tagged::{doc-tests}/MlClientDocumentationIT.java[x-pack-delete-ml-job-response] --------------------------------------------------- -<1> `isAcknowledged` was the deletion request acknowledged or not +<1> whether was job deletion was acknowledged or not; will be `null` when set not to wait for completion +<2> the id of the job deletion task; will be `null` when set to wait for completion [[java-rest-high-x-pack-machine-learning-delete-job-async]] ==== Delete Job Asynchronously diff --git a/docs/reference/ml/apis/delete-job.asciidoc b/docs/reference/ml/apis/delete-job.asciidoc index d5ef120ad040b..b9dbe9e3cd6bd 100644 --- a/docs/reference/ml/apis/delete-job.asciidoc +++ b/docs/reference/ml/apis/delete-job.asciidoc @@ -41,6 +41,9 @@ separated list. (boolean) Use to forcefully delete an opened job; this method is quicker than closing and deleting the job. +`wait_for_completion`:: + (boolean) Specifies whether the request should return immediately or wait + until the job deletion completes. Defaults to `true`. ==== Authorization @@ -66,4 +69,23 @@ When the job is deleted, you receive the following results: "acknowledged": true } ---- -// TESTRESPONSE \ No newline at end of file +// TESTRESPONSE + +In the next example we delete the `total-requests` job asynchronously: + +[source,js] +-------------------------------------------------- +DELETE _xpack/ml/anomaly_detectors/total-requests?wait_for_completion=false +-------------------------------------------------- +// CONSOLE +// TEST[skip:setup:server_metrics_job] + +When `wait_for_completion` is set to `false`, the response contains the id +of the job deletion task: +[source,js] +---- +{ + "task": "oTUltX4IQMOUUVeiohTt8A:39" +} +---- +// TESTRESPONSE[s/"task": "oTUltX4IQMOUUVeiohTt8A:39"/"task": $body.task/] \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskId.java b/server/src/main/java/org/elasticsearch/tasks/TaskId.java index 1aeceef247f47..f92997b047c13 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskId.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskId.java @@ -19,10 +19,13 @@ package org.elasticsearch.tasks; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ContextParser; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -96,6 +99,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(id); } + public static ContextParser parser() { + return (p, c) -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return new TaskId(p.text()); + } + throw new ElasticsearchParseException("Expected a string but found [{}] instead", p.currentToken()); + }; + } + public String getNodeId() { return nodeId; } diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java index a866ad9bb2dd1..46b68ce16028c 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskResult.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskResult.java @@ -76,7 +76,7 @@ public TaskResult(TaskInfo task, Exception error) throws IOException { * Construct a {@linkplain TaskResult} for a task that completed successfully. */ public TaskResult(TaskInfo task, ToXContent response) throws IOException { - this(true, task, null, toXContent(response)); + this(true, task, null, XContentHelper.toXContent(response, Requests.INDEX_CONTENT_TYPE, true)); } private TaskResult(boolean completed, TaskInfo task, @Nullable BytesReference error, @Nullable BytesReference result) { @@ -222,16 +222,6 @@ public int hashCode() { return Objects.hash(completed, task, getErrorAsMap(), getResponseAsMap()); } - private static BytesReference toXContent(ToXContent result) throws IOException { - try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { - // Elasticsearch's Response object never emit starting or ending objects. Most other implementers of ToXContent do.... - builder.startObject(); - result.toXContent(builder, ToXContent.EMPTY_PARAMS); - builder.endObject(); - return BytesReference.bytes(builder); - } - } - private static BytesReference toXContent(Exception error) throws IOException { try (XContentBuilder builder = XContentFactory.contentBuilder(Requests.INDEX_CONTENT_TYPE)) { builder.startObject(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index 193695ac69362..8d3c6a3565f93 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -91,9 +91,9 @@ public Set expandJobIds(String expression, boolean allowNoJobs) { return groupOrJobLookup.expandJobIds(expression, allowNoJobs); } - public boolean isJobDeleted(String jobId) { + public boolean isJobDeleting(String jobId) { Job job = jobs.get(jobId); - return job == null || job.isDeleted(); + return job == null || job.isDeleting(); } public SortedMap getDatafeeds() { @@ -287,7 +287,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) { if (job == null) { throw new ResourceNotFoundException("job [" + jobId + "] does not exist"); } - if (job.isDeleted() == false) { + if (job.isDeleting() == false) { throw ExceptionsHelper.conflictStatusException("Cannot delete job [" + jobId + "] because it hasn't marked as deleted"); } return this; @@ -318,7 +318,7 @@ public Builder putDatafeed(DatafeedConfig datafeedConfig, Map he private void checkJobIsAvailableForDatafeed(String jobId) { Job job = jobs.get(jobId); - if (job == null || job.isDeleted()) { + if (job == null || job.isDeleting()) { throw ExceptionsHelper.missingJobException(jobId); } Optional existingDatafeed = getDatafeedByJobId(jobId); @@ -387,14 +387,14 @@ public MlMetadata build() { return new MlMetadata(jobs, datafeeds); } - public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { + public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { Job job = jobs.get(jobId); if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } - if (job.isDeleted()) { + if (job.isDeleting()) { // Job still exists but is already being deleted - throw new JobAlreadyMarkedAsDeletedException(); + return; } checkJobHasNoDatafeed(jobId); @@ -408,7 +408,7 @@ public void markJobAsDeleted(String jobId, PersistentTasksCustomMetaData tasks, } } Job.Builder jobBuilder = new Job.Builder(job); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); putJob(jobBuilder.build(), true); } @@ -430,7 +430,4 @@ public static MlMetadata getMlMetadata(ClusterState state) { } return mlMetadata; } - - public static class JobAlreadyMarkedAsDeletedException extends RuntimeException { - } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java index 9fbde4721cd6a..6b279e0852183 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteJobAction.java @@ -42,6 +42,11 @@ public static class Request extends AcknowledgedRequest { private String jobId; private boolean force; + /** + * Should this task store its result? + */ + private boolean shouldStoreResult; + public Request(String jobId) { this.jobId = ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); } @@ -64,6 +69,18 @@ public void setForce(boolean force) { this.force = force; } + /** + * Should this task store its result after it has finished? + */ + public void setShouldStoreResult(boolean shouldStoreResult) { + this.shouldStoreResult = shouldStoreResult; + } + + @Override + public boolean getShouldStoreResult() { + return shouldStoreResult; + } + @Override public ActionRequestValidationException validate() { return null; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index a5293cdcbc75d..5a352ab26657c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -75,7 +75,7 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id"); public static final ParseField MODEL_SNAPSHOT_MIN_VERSION = new ParseField("model_snapshot_min_version"); public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name"); - public static final ParseField DELETED = new ParseField("deleted"); + public static final ParseField DELETING = new ParseField("deleting"); // Used for QueryPage public static final ParseField RESULTS_FIELD = new ParseField("jobs"); @@ -119,7 +119,7 @@ private static ObjectParser createParser(boolean ignoreUnknownFie parser.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID); parser.declareStringOrNull(Builder::setModelSnapshotMinVersion, MODEL_SNAPSHOT_MIN_VERSION); parser.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME); - parser.declareBoolean(Builder::setDeleted, DELETED); + parser.declareBoolean(Builder::setDeleting, DELETING); return parser; } @@ -152,14 +152,14 @@ private static ObjectParser createParser(boolean ignoreUnknownFie private final String modelSnapshotId; private final Version modelSnapshotMinVersion; private final String resultsIndexName; - private final boolean deleted; + private final boolean deleting; private Job(String jobId, String jobType, Version jobVersion, List groups, String description, Date createTime, Date finishedTime, Long establishedModelMemory, AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription, ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval, Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map customSettings, - String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleted) { + String modelSnapshotId, Version modelSnapshotMinVersion, String resultsIndexName, boolean deleting) { this.jobId = jobId; this.jobType = jobType; @@ -181,7 +181,7 @@ private Job(String jobId, String jobType, Version jobVersion, List group this.modelSnapshotId = modelSnapshotId; this.modelSnapshotMinVersion = modelSnapshotMinVersion; this.resultsIndexName = resultsIndexName; - this.deleted = deleted; + this.deleting = deleting; } public Job(StreamInput in) throws IOException { @@ -224,7 +224,7 @@ public Job(StreamInput in) throws IOException { modelSnapshotMinVersion = null; } resultsIndexName = in.readString(); - deleted = in.readBoolean(); + deleting = in.readBoolean(); } /** @@ -375,8 +375,8 @@ public Version getModelSnapshotMinVersion() { return modelSnapshotMinVersion; } - public boolean isDeleted() { - return deleted; + public boolean isDeleting() { + return deleting; } /** @@ -489,7 +489,7 @@ public void writeTo(StreamOutput out) throws IOException { } } out.writeString(resultsIndexName); - out.writeBoolean(deleted); + out.writeBoolean(deleting); } @Override @@ -554,8 +554,8 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th builder.field(MODEL_SNAPSHOT_MIN_VERSION.getPreferredName(), modelSnapshotMinVersion); } builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); - if (params.paramAsBoolean("all", false)) { - builder.field(DELETED.getPreferredName(), deleted); + if (deleting) { + builder.field(DELETING.getPreferredName(), deleting); } return builder; } @@ -591,7 +591,7 @@ public boolean equals(Object other) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleted, that.deleted); + && Objects.equals(this.deleting, that.deleting); } @Override @@ -599,7 +599,7 @@ public int hashCode() { return Objects.hash(jobId, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } // Class already extends from AbstractDiffable, so copied from ToXContentToBytes#toString() @@ -647,7 +647,7 @@ public static class Builder implements Writeable, ToXContentObject { private String modelSnapshotId; private Version modelSnapshotMinVersion; private String resultsIndexName; - private boolean deleted; + private boolean deleting; public Builder() { } @@ -677,7 +677,7 @@ public Builder(Job job) { this.modelSnapshotId = job.getModelSnapshotId(); this.modelSnapshotMinVersion = job.getModelSnapshotMinVersion(); this.resultsIndexName = job.getResultsIndexNameNoPrefix(); - this.deleted = job.isDeleted(); + this.deleting = job.isDeleting(); } public Builder(StreamInput in) throws IOException { @@ -717,7 +717,7 @@ public Builder(StreamInput in) throws IOException { modelSnapshotMinVersion = null; } resultsIndexName = in.readOptionalString(); - deleted = in.readBoolean(); + deleting = in.readBoolean(); } public Builder setId(String id) { @@ -834,8 +834,8 @@ public Builder setResultsIndexName(String resultsIndexName) { return this; } - public Builder setDeleted(boolean deleted) { - this.deleted = deleted; + public Builder setDeleting(boolean deleting) { + this.deleting = deleting; return this; } @@ -911,7 +911,7 @@ public void writeTo(StreamOutput out) throws IOException { } } out.writeOptionalString(resultsIndexName); - out.writeBoolean(deleted); + out.writeBoolean(deleting); } @Override @@ -972,8 +972,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (resultsIndexName != null) { builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName); } - if (params.paramAsBoolean("all", false)) { - builder.field(DELETED.getPreferredName(), deleted); + if (deleting) { + builder.field(DELETING.getPreferredName(), deleting); } builder.endObject(); @@ -1006,7 +1006,7 @@ public boolean equals(Object o) { && Objects.equals(this.modelSnapshotId, that.modelSnapshotId) && Objects.equals(this.modelSnapshotMinVersion, that.modelSnapshotMinVersion) && Objects.equals(this.resultsIndexName, that.resultsIndexName) - && Objects.equals(this.deleted, that.deleted); + && Objects.equals(this.deleting, that.deleting); } @Override @@ -1014,7 +1014,7 @@ public int hashCode() { return Objects.hash(id, jobType, jobVersion, groups, description, analysisConfig, analysisLimits, dataDescription, createTime, finishedTime, establishedModelMemory, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, modelSnapshotId, - modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotMinVersion, resultsIndexName, deleting); } /** @@ -1127,7 +1127,7 @@ public Job build() { id, jobType, jobVersion, groups, description, createTime, finishedTime, establishedModelMemory, analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays, backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings, - modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleted); + modelSnapshotId, modelSnapshotMinVersion, resultsIndexName, deleting); } private void checkValidBackgroundPersistInterval() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index 3c571c9d60509..b669e8f1edcfb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -67,6 +67,8 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; public static final String JOB_AUDIT_DATAFEED_STOPPED = "Datafeed stopped"; + public static final String JOB_AUDIT_DELETING = "Deleting job by task with id ''{0}''"; + public static final String JOB_AUDIT_DELETING_FAILED = "Error deleting job: {0}"; public static final String JOB_AUDIT_DELETED = "Job deleted"; public static final String JOB_AUDIT_KILLING = "Killing job"; public static final String JOB_AUDIT_OLD_RESULTS_DELETED = "Deleted results prior to {1}"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java index 2f218cfb2dc4d..f3cd2abf461b7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/persistence/JobDeletionTask.java @@ -12,7 +12,17 @@ public class JobDeletionTask extends Task { + private volatile boolean started; + public JobDeletionTask(long id, String type, String action, String description, TaskId parentTask, Map headers) { super(id, type, action, description, parentTask, headers); } + + public void start() { + started = true; + } + + public boolean isStarted() { + return started; + } } diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java index 4d7c5cd058be3..e9f9166a2a3ba 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlJobIT.java @@ -16,9 +16,9 @@ import org.elasticsearch.test.SecuritySettingsSourceField; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.core.ml.integration.MlRestTestStateCleaner; -import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndexFields; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.test.rest.XPackRestTestHelper; import org.junit.After; @@ -26,6 +26,8 @@ import java.util.Locale; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; @@ -386,6 +388,41 @@ public void testDeleteJob() throws Exception { String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); assertThat(indicesAfterDelete, containsString(indexName)); + waitUntilIndexIsEmpty(indexName); + + // check that the job itself is gone + expectThrows(ResponseException.class, () -> + client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + } + + public void testDeleteJobAsync() throws Exception { + String jobId = "delete-job-async-job"; + String indexName = AnomalyDetectorsIndexFields.RESULTS_INDEX_PREFIX + AnomalyDetectorsIndexFields.RESULTS_INDEX_DEFAULT; + createFarequoteJob(jobId); + + String indicesBeforeDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); + assertThat(indicesBeforeDelete, containsString(indexName)); + + Response response = client().performRequest(new Request("DELETE", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + + "?wait_for_completion=false")); + + // Wait for task to complete + String taskId = extractTaskId(response); + Response taskResponse = client().performRequest(new Request("GET", "_tasks/" + taskId + "?wait_for_completion=true")); + assertThat(EntityUtils.toString(taskResponse.getEntity()), containsString("\"acknowledged\":true")); + + // check that the index still exists (it's shared by default) + String indicesAfterDelete = EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/indices")).getEntity()); + assertThat(indicesAfterDelete, containsString(indexName)); + + waitUntilIndexIsEmpty(indexName); + + // check that the job itself is gone + expectThrows(ResponseException.class, () -> + client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + } + + private void waitUntilIndexIsEmpty(String indexName) throws Exception { assertBusy(() -> { try { String count = EntityUtils.toString(client().performRequest(new Request("GET", indexName + "/_count")).getEntity()); @@ -394,10 +431,14 @@ public void testDeleteJob() throws Exception { fail(e.getMessage()); } }); + } - // check that the job itself is gone - expectThrows(ResponseException.class, () -> - client().performRequest(new Request("GET", MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"))); + private static String extractTaskId(Response response) throws IOException { + String responseAsString = EntityUtils.toString(response.getEntity()); + Pattern matchTaskId = Pattern.compile(".*\"task\":.*\"(.*)\".*"); + Matcher taskIdMatcher = matchTaskId.matcher(responseAsString); + assertTrue(taskIdMatcher.matches()); + return taskIdMatcher.group(1); } public void testDeleteJobAfterMissingIndex() throws Exception { @@ -521,7 +562,7 @@ public void testMultiIndexDelete() throws Exception { } public void testDelete_multipleRequest() throws Exception { - String jobId = "delete-job-mulitple-times"; + String jobId = "delete-job-multiple-times"; createFarequoteJob(jobId); ConcurrentMapLong responses = ConcurrentCollections.newConcurrentMapLong(); @@ -532,8 +573,8 @@ public void testDelete_multipleRequest() throws Exception { AtomicReference recreationException = new AtomicReference<>(); Runnable deleteJob = () -> { + boolean forceDelete = randomBoolean(); try { - boolean forceDelete = randomBoolean(); String url = MachineLearning.BASE_PATH + "anomaly_detectors/" + jobId; if (forceDelete) { url += "?force=true"; @@ -554,6 +595,7 @@ public void testDelete_multipleRequest() throws Exception { } catch (ResponseException re) { recreationException.set(re); } catch (IOException e) { + logger.error("Error trying to recreate the job", e); ioe.set(e); } } @@ -563,14 +605,14 @@ public void testDelete_multipleRequest() throws Exception { // the other to complete. This is difficult to schedule but // hopefully it will happen in CI int numThreads = 5; - Thread [] threads = new Thread[numThreads]; - for (int i=0; i jobIdProcessor = id -> { validateJobAndTaskState(id, mlMetadata, tasksMetaData); Job job = mlMetadata.getJobs().get(id); - if (job.isDeleted()) { + if (job.isDeleting()) { return; } addJobAccordingToState(id, tasksMetaData, openJobIds, closingJobIds, failedJobs); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java index 1d285b91f2f8e..89f42d622411f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteJobAction.java @@ -23,9 +23,9 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateObserver; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -34,9 +34,9 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.query.ConstantScoreQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; @@ -45,14 +45,13 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; @@ -72,10 +71,11 @@ import org.elasticsearch.xpack.ml.utils.MlIndicesUtils; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; @@ -90,6 +90,14 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction>> listenersByJobId; + @Inject public TransportDeleteJobAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, @@ -101,6 +109,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer this.persistentTasksService = persistentTasksService; this.auditor = auditor; this.jobResultsProvider = jobResultsProvider; + this.listenersByJobId = new HashMap<>(); } @Override @@ -113,58 +122,82 @@ protected AcknowledgedResponse newResponse() { return new AcknowledgedResponse(); } + @Override + protected ClusterBlockException checkBlock(DeleteJobAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(DeleteJobAction.Request request, ClusterState state, ActionListener listener) { + throw new UnsupportedOperationException("the Task parameter is required"); + } + @Override protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state, ActionListener listener) { + logger.debug("Deleting job '{}'", request.getJobId()); + + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId); + + // Check if there is a deletion task for this job already and if yes wait for it to complete + synchronized (listenersByJobId) { + if (listenersByJobId.containsKey(request.getJobId())) { + logger.debug("[{}] Deletion task [{}] will wait for existing deletion task to complete", + request.getJobId(), task.getId()); + listenersByJobId.get(request.getJobId()).add(listener); + return; + } else { + List> listeners = new ArrayList<>(); + listeners.add(listener); + listenersByJobId.put(request.getJobId(), listeners); + } + } + + auditor.info(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING, taskId)); + + // The listener that will be executed at the end of the chain will notify all listeners + ActionListener finalListener = ActionListener.wrap( + ack -> notifyListeners(request.getJobId(), ack, null), + e -> notifyListeners(request.getJobId(), null, e) + ); ActionListener markAsDeletingListener = ActionListener.wrap( response -> { if (request.isForce()) { - forceDeleteJob(request, listener); + forceDeleteJob(parentTaskClient, request, finalListener); } else { - normalDeleteJob(request, listener); + normalDeleteJob(parentTaskClient, request, finalListener); } }, e -> { - if (e instanceof MlMetadata.JobAlreadyMarkedAsDeletedException) { - // Don't kick off a parallel deletion task, but just wait for - // the in-progress request to finish. This is much safer in the - // case where the job with the same name might be immediately - // recreated after the delete returns. However, if a force - // delete times out then eventually kick off a parallel delete - // in case the original completely failed for some reason. - waitForDeletingJob(request.getJobId(), MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT, - ActionListener.wrap( - listener::onResponse, - e2 -> { - if (request.isForce() && e2 instanceof TimeoutException) { - forceDeleteJob(request, listener); - } else { - listener.onFailure(e2); - } - } - )); - } else { - listener.onFailure(e); - } + auditor.error(request.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DELETING_FAILED, e.getMessage())); + finalListener.onFailure(e); }); markJobAsDeleting(request.getJobId(), markAsDeletingListener, request.isForce()); } - @Override - protected void masterOperation(DeleteJobAction.Request request, ClusterState state, ActionListener listener) { - throw new UnsupportedOperationException("the Task parameter is required"); - } - - @Override - protected ClusterBlockException checkBlock(DeleteJobAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + private void notifyListeners(String jobId, @Nullable AcknowledgedResponse ack, @Nullable Exception error) { + synchronized (listenersByJobId) { + List> listeners = listenersByJobId.remove(jobId); + if (listeners == null) { + logger.error("[{}] No deletion job listeners could be found", jobId); + return; + } + for (ActionListener listener : listeners) { + if (error != null) { + listener.onFailure(error); + } else { + listener.onResponse(ack); + } + } + } } - private void normalDeleteJob(DeleteJobAction.Request request, ActionListener listener) { + private void normalDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, + ActionListener listener) { String jobId = request.getJobId(); - logger.debug("Deleting job '" + jobId + "'"); // Step 4. When the job has been removed from the cluster state, return a response // ------- @@ -212,10 +245,11 @@ public ClusterState execute(ClusterState currentState) { // Step 1. Delete the physical storage - deleteJobDocuments(jobId, removeFromCalendarsHandler, listener::onFailure); + deleteJobDocuments(parentTaskClient, jobId, removeFromCalendarsHandler, listener::onFailure); } - private void deleteJobDocuments(String jobId, CheckedConsumer finishedHandler, Consumer failureHandler) { + private void deleteJobDocuments(ParentTaskAssigningClient parentTaskClient, String jobId, + CheckedConsumer finishedHandler, Consumer failureHandler) { final String indexName = AnomalyDetectorsIndex.getPhysicalIndexFromState(clusterService.state(), jobId); final String indexPattern = indexName + "-*"; @@ -241,7 +275,7 @@ private void deleteJobDocuments(String jobId, CheckedConsumerwrap( response -> deleteByQueryExecutor.onResponse(false), // skip DBQ && Alias failureHandler), - client.admin().indices()::delete); + parentTaskClient.admin().indices()::delete); } }, failure -> { @@ -312,7 +346,7 @@ private void deleteJobDocuments(String jobId, CheckedConsumer deleteQuantilesHandler = ActionListener.wrap( - response -> deleteCategorizerState(jobId, client, 1, deleteCategorizerStateHandler), + response -> deleteCategorizerState(parentTaskClient, jobId, 1, deleteCategorizerStateHandler), failureHandler); // Step 2. Delete state done, delete the quantiles ActionListener deleteStateHandler = ActionListener.wrap( - bulkResponse -> deleteQuantiles(jobId, client, deleteQuantilesHandler), + bulkResponse -> deleteQuantiles(parentTaskClient, jobId, deleteQuantilesHandler), failureHandler); // Step 1. Delete the model state - deleteModelState(jobId, client, deleteStateHandler); + deleteModelState(parentTaskClient, jobId, deleteStateHandler); } - private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler) { + private void deleteQuantiles(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener finishedHandler) { // The quantiles type and doc ID changed in v5.5 so delete both the old and new format DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace @@ -344,7 +378,7 @@ private void deleteQuantiles(String jobId, Client client, ActionListener finishedHandler.onResponse(true), e -> { // It's not a problem for us if the index wasn't found - it's equivalent to document not found @@ -356,19 +390,20 @@ private void deleteQuantiles(String jobId, Client client, ActionListener listener) { + private void deleteModelState(ParentTaskAssigningClient parentTaskClient, String jobId, ActionListener listener) { GetModelSnapshotsAction.Request request = new GetModelSnapshotsAction.Request(jobId, null); request.setPageParams(new PageParams(0, MAX_SNAPSHOTS_TO_DELETE)); - executeAsyncWithOrigin(client, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, GetModelSnapshotsAction.INSTANCE, request, ActionListener.wrap( response -> { List deleteCandidates = response.getPage().results(); - JobDataDeleter deleter = new JobDataDeleter(client, jobId); + JobDataDeleter deleter = new JobDataDeleter(parentTaskClient, jobId); deleter.deleteModelSnapshots(deleteCandidates, listener); }, listener::onFailure)); } - private void deleteCategorizerState(String jobId, Client client, int docNum, ActionListener finishedHandler) { + private void deleteCategorizerState(ParentTaskAssigningClient parentTaskClient, String jobId, int docNum, + ActionListener finishedHandler) { // The categorizer state type and doc ID changed in v5.5 so delete both the old and new format DeleteByQueryRequest request = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexName()); // Just use ID here, not type, as trying to delete different types spams the logs with an exception stack trace @@ -380,13 +415,13 @@ private void deleteCategorizerState(String jobId, Client client, int docNum, Act request.setAbortOnVersionConflict(false); request.setRefresh(true); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, ActionListener.wrap( response -> { // If we successfully deleted a document try the next one; if not we're done if (response.getDeleted() > 0) { // There's an assumption here that there won't be very many categorizer // state documents, so the recursion won't go more than, say, 5 levels deep - deleteCategorizerState(jobId, client, docNum + 1, finishedHandler); + deleteCategorizerState(parentTaskClient, jobId, docNum + 1, finishedHandler); return; } finishedHandler.onResponse(true); @@ -401,14 +436,15 @@ private void deleteCategorizerState(String jobId, Client client, int docNum, Act })); } - private void deleteAliases(String jobId, Client client, ActionListener finishedHandler) { + private void deleteAliases(ParentTaskAssigningClient parentTaskClient, String jobId, + ActionListener finishedHandler) { final String readAliasName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); final String writeAliasName = AnomalyDetectorsIndex.resultsWriteAlias(jobId); // first find the concrete indices associated with the aliases GetAliasesRequest aliasesRequest = new GetAliasesRequest().aliases(readAliasName, writeAliasName) .indicesOptions(IndicesOptions.lenientExpandOpen()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, + executeAsyncWithOrigin(parentTaskClient.threadPool().getThreadContext(), ML_ORIGIN, aliasesRequest, ActionListener.wrap( getAliasesResponse -> { // remove the aliases from the concrete indices found in the first step @@ -419,13 +455,13 @@ private void deleteAliases(String jobId, Client client, ActionListenerwrap( finishedHandler::onResponse, finishedHandler::onFailure), - client.admin().indices()::aliases); + parentTaskClient.admin().indices()::aliases); }, - finishedHandler::onFailure), client.admin().indices()::getAliases); + finishedHandler::onFailure), parentTaskClient.admin().indices()::getAliases); } private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAliasesResponse) { @@ -445,7 +481,10 @@ private IndicesAliasesRequest buildRemoveAliasesRequest(GetAliasesResponse getAl .indices(indices.toArray(new String[indices.size()]))); } - private void forceDeleteJob(DeleteJobAction.Request request, ActionListener listener) { + private void forceDeleteJob(ParentTaskAssigningClient parentTaskClient, DeleteJobAction.Request request, + ActionListener listener) { + + logger.debug("Force deleting job [{}]", request.getJobId()); final ClusterState state = clusterService.state(); final String jobId = request.getJobId(); @@ -454,13 +493,13 @@ private void forceDeleteJob(DeleteJobAction.Request request, ActionListener removeTaskListener = new ActionListener() { @Override public void onResponse(Boolean response) { - normalDeleteJob(request, listener); + normalDeleteJob(parentTaskClient, request, listener); } @Override public void onFailure(Exception e) { if (e instanceof ResourceNotFoundException) { - normalDeleteJob(request, listener); + normalDeleteJob(parentTaskClient, request, listener); } else { listener.onFailure(e); } @@ -483,12 +522,13 @@ public void onFailure(Exception e) { ); // 1. Kill the job's process - killProcess(jobId, killJobListener); + killProcess(parentTaskClient, jobId, killJobListener); } - private void killProcess(String jobId, ActionListener listener) { + private void killProcess(ParentTaskAssigningClient parentTaskClient, String jobId, + ActionListener listener) { KillProcessAction.Request killRequest = new KillProcessAction.Request(jobId); - executeAsyncWithOrigin(client, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener); + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, KillProcessAction.INSTANCE, killRequest, listener); } private void removePersistentTask(String jobId, ClusterState currentState, @@ -520,7 +560,7 @@ private void markJobAsDeleting(String jobId, ActionListener listener, b public ClusterState execute(ClusterState currentState) { PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); - builder.markJobAsDeleted(jobId, tasks, force); + builder.markJobAsDeleting(jobId, tasks, force); return buildNewClusterState(currentState, builder); } @@ -537,32 +577,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS }); } - private void waitForDeletingJob(String jobId, TimeValue timeout, ActionListener listener) { - ClusterStateObserver stateObserver = new ClusterStateObserver(clusterService, timeout, logger, threadPool.getThreadContext()); - - ClusterState clusterState = stateObserver.setAndGetObservedState(); - if (jobIsDeletedFromState(jobId, clusterState)) { - listener.onResponse(new AcknowledgedResponse(true)); - } else { - stateObserver.waitForNextChange(new ClusterStateObserver.Listener() { - @Override - public void onNewClusterState(ClusterState state) { - listener.onResponse(new AcknowledgedResponse(true)); - } - - @Override - public void onClusterServiceClose() { - listener.onFailure(new NodeClosedException(clusterService.localNode())); - } - - @Override - public void onTimeout(TimeValue timeout) { - listener.onFailure(new TimeoutException("timed out after " + timeout)); - } - }, newClusterState -> jobIsDeletedFromState(jobId, newClusterState), timeout); - } - } - static boolean jobIsDeletedFromState(String jobId, ClusterState clusterState) { return !MlMetadata.getMlMetadata(clusterState).getJobs().containsKey(jobId); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index ab1ef73780e5e..7217fcc6ec9a7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -183,6 +183,6 @@ static List determineNonDeletedJobIdsWithoutLiveStats(MlMetadata mlMetad List stats) { Set excludeJobIds = stats.stream().map(GetJobsStatsAction.Response.JobStats::getJobId).collect(Collectors.toSet()); return requestedJobIds.stream().filter(jobId -> !excludeJobIds.contains(jobId) && - !mlMetadata.isJobDeleted(jobId)).collect(Collectors.toList()); + !mlMetadata.isJobDeleting(jobId)).collect(Collectors.toList()); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index 512d8188abfac..42b67b2917387 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -127,8 +127,8 @@ static void validate(String jobId, MlMetadata mlMetadata) { if (job == null) { throw ExceptionsHelper.missingJobException(jobId); } - if (job.isDeleted()) { - throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it has been marked as deleted"); + if (job.isDeleting()) { + throw ExceptionsHelper.conflictStatusException("Cannot open job [" + jobId + "] because it is being deleted"); } if (job.getJobVersion() == null) { throw ExceptionsHelper.badRequestException("Cannot open job [" + jobId diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java index b1c73dc04dbf1..3a76b71980bec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestDeleteJobAction.java @@ -7,10 +7,15 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.xpack.core.ml.action.CloseJobAction; import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; import org.elasticsearch.xpack.core.ml.job.config.Job; @@ -37,6 +42,35 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient deleteJobRequest.setForce(restRequest.paramAsBoolean(CloseJobAction.Request.FORCE.getPreferredName(), deleteJobRequest.isForce())); deleteJobRequest.timeout(restRequest.paramAsTime("timeout", deleteJobRequest.timeout())); deleteJobRequest.masterNodeTimeout(restRequest.paramAsTime("master_timeout", deleteJobRequest.masterNodeTimeout())); - return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel)); + + if (restRequest.paramAsBoolean("wait_for_completion", true)) { + return channel -> client.execute(DeleteJobAction.INSTANCE, deleteJobRequest, new RestToXContentListener<>(channel)); + } else { + deleteJobRequest.setShouldStoreResult(true); + + Task task = client.executeLocally(DeleteJobAction.INSTANCE, deleteJobRequest, nullTaskListener()); + // Send task description id instead of waiting for the message + return channel -> { + try (XContentBuilder builder = channel.newBuilder()) { + builder.startObject(); + builder.field("task", client.getLocalNodeId() + ":" + task.getId()); + builder.endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); + } + }; + } + } + + // We do not want to log anything due to a delete action + // The response or error will be returned to the client when called synchronously + // or it will be stored in the task result when called asynchronously + private static TaskListener nullTaskListener() { + return new TaskListener() { + @Override + public void onResponse(Task task, Object o) {} + + @Override + public void onFailure(Task task, Throwable e) {} + }; } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index e16ac2f99700d..82478fbf5d337 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -124,7 +124,7 @@ public void testPutJob() { public void testRemoveJob() { Job.Builder jobBuilder = buildJobBuilder("1"); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); Job job1 = jobBuilder.build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); @@ -206,7 +206,7 @@ public void testPutDatafeed_failBecauseJobDoesNotExist() { } public void testPutDatafeed_failBecauseJobIsBeingDeleted() { - Job job1 = createDatafeedJob().setDeleted(true).build(new Date()); + Job job1 = createDatafeedJob().setDeleting(true).build(new Date()); DatafeedConfig datafeedConfig1 = createDatafeedConfig("datafeed1", job1.getId()).build(); MlMetadata.Builder builder = new MlMetadata.Builder(); builder.putJob(job1, false); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 2e00ad71251db..6d4b008570c72 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -28,7 +28,7 @@ public class TransportGetJobsStatsActionTests extends ESTestCase { public void testDetermineJobIds() { MlMetadata mlMetadata = mock(MlMetadata.class); - when(mlMetadata.isJobDeleted(eq("id4"))).thenReturn(true); + when(mlMetadata.isJobDeleting(eq("id4"))).thenReturn(true); List result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Collections.singletonList("id1"), Collections.emptyList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 58b60273b0e6d..4dd41363b73fe 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -79,14 +79,14 @@ public void testValidate_jobMissing() { expectThrows(ResourceNotFoundException.class, () -> TransportOpenJobAction.validate("job_id2", mlBuilder.build())); } - public void testValidate_jobMarkedAsDeleted() { + public void testValidate_jobMarkedAsDeleting() { MlMetadata.Builder mlBuilder = new MlMetadata.Builder(); Job.Builder jobBuilder = buildJobBuilder("job_id"); - jobBuilder.setDeleted(true); + jobBuilder.setDeleting(true); mlBuilder.putJob(jobBuilder.build(), false); Exception e = expectThrows(ElasticsearchStatusException.class, () -> TransportOpenJobAction.validate("job_id", mlBuilder.build())); - assertEquals("Cannot open job [job_id] because it has been marked as deleted", e.getMessage()); + assertEquals("Cannot open job [job_id] because it is being deleted", e.getMessage()); } public void testValidate_jobWithoutVersion() { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java deleted file mode 100644 index ed23a5328aec1..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteJobIT.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.integration; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.xpack.core.ml.MlMetadata; -import org.elasticsearch.xpack.core.ml.action.DeleteJobAction; -import org.elasticsearch.xpack.core.ml.action.PutJobAction; -import org.elasticsearch.xpack.core.ml.job.config.Job; -import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -public class DeleteJobIT extends BaseMlIntegTestCase { - - public void testWaitForDelete() throws ExecutionException, InterruptedException { - final String jobId = "wait-for-delete-job"; - Job.Builder job = createJob(jobId); - PutJobAction.Request putJobRequest = new PutJobAction.Request(job); - client().execute(PutJobAction.INSTANCE, putJobRequest).get(); - - AtomicReference exceptionHolder = new AtomicReference<>(); - CountDownLatch markAsDeletedLatch = new CountDownLatch(1); - clusterService().submitStateUpdateTask("mark-job-as-deleted", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - return markJobAsDeleted(jobId, currentState); - } - - @Override - public void onFailure(String source, Exception e) { - markAsDeletedLatch.countDown(); - exceptionHolder.set(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - markAsDeletedLatch.countDown(); - } - }); - - assertTrue("Timed out waiting for state update", markAsDeletedLatch.await(5, TimeUnit.SECONDS)); - assertNull("mark-job-as-deleted task failed: " + exceptionHolder.get(), exceptionHolder.get()); - - // Job is marked as deleting so now a delete request should wait for it. - AtomicBoolean isDeleted = new AtomicBoolean(false); - AtomicReference deleteFailure = new AtomicReference<>(); - ActionListener deleteListener = new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse response) { - isDeleted.compareAndSet(false, response.isAcknowledged()); - } - - @Override - public void onFailure(Exception e) { - deleteFailure.set(e); - } - }; - - client().execute(DeleteJobAction.INSTANCE, new DeleteJobAction.Request(jobId), deleteListener); - awaitBusy(isDeleted::get, 1, TimeUnit.SECONDS); - // still waiting - assertFalse(isDeleted.get()); - - CountDownLatch removeJobLatch = new CountDownLatch(1); - clusterService().submitStateUpdateTask("remove-job-from-state", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - assertFalse(isDeleted.get()); - return removeJobFromClusterState(jobId, currentState); - } - - @Override - public void onFailure(String source, Exception e) { - removeJobLatch.countDown(); - exceptionHolder.set(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - removeJobLatch.countDown(); - } - }); - - assertTrue("Timed out waiting for remove job from state response", removeJobLatch.await(5, TimeUnit.SECONDS)); - assertNull("remove-job-from-state task failed: " + exceptionHolder.get(), exceptionHolder.get()); - - assertNull("Job deletion failed: " + deleteFailure.get(), deleteFailure.get()); - assertTrue("Job was not deleted", isDeleted.get()); - } - - private ClusterState markJobAsDeleted(String jobId, ClusterState currentState) { - MlMetadata mlMetadata = MlMetadata.getMlMetadata(currentState); - assertNotNull(mlMetadata); - - MlMetadata.Builder builder = new MlMetadata.Builder(mlMetadata); - PersistentTasksCustomMetaData tasks = currentState.metaData().custom(PersistentTasksCustomMetaData.TYPE); - builder.markJobAsDeleted(jobId, tasks, true); - - ClusterState.Builder newState = ClusterState.builder(currentState); - return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()) - .build(); - } - - private ClusterState removeJobFromClusterState(String jobId, ClusterState currentState) { - MlMetadata.Builder builder = new MlMetadata.Builder(MlMetadata.getMlMetadata(currentState)); - builder.deleteJob(jobId, currentState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE)); - - ClusterState.Builder newState = ClusterState.builder(currentState); - return newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, builder.build()).build()) - .build(); - } -} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json index 77eb89c00f92d..f93fff6eaab4e 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/xpack.ml.delete_job.json @@ -15,8 +15,13 @@ "params": { "force": { "type": "boolean", - "required": false, - "description": "True if the job should be forcefully deleted" + "description": "True if the job should be forcefully deleted", + "default": false + }, + "wait_for_completion": { + "type": "boolean", + "description": "Should this request wait until the operation has completed before returning", + "default": true } } },