Skip to content

Commit

Permalink
[ML] Allow asynchronous job deletion (#34058)
Browse files Browse the repository at this point in the history
This changes the delete job API by adding
the choice to delete a job asynchronously.
The commit adds a `wait_for_completion` parameter
to the delete job request. When set to `false`,
the action returns immediately and the response
contains the task id.

This also changes the handling of subsequent
delete requests for a job that is already being
deleted. It now uses the task framework to check
if the job is being deleted instead of the cluster
state. This is a beneficial for it is going to also
be working once the job configs are moved out of the
cluster state and into an index. Also, force delete
requests that are waiting for the job to be deleted
will not proceed with the deletion if the first task
fails. This will prevent overloading the cluster. Instead,
the failure is communicated better via notifications
so that the user may retry.

Finally, this makes the `deleting` property of the job
visible (also it was renamed from `deleted`). This allows
a client to render a deleting job differently.

Closes #32836
  • Loading branch information
dimitris-athanasiou authored and kcm committed Oct 30, 2018
1 parent b1b278a commit c8fa5b9
Show file tree
Hide file tree
Showing 31 changed files with 611 additions and 428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ static Request deleteJob(DeleteJobRequest deleteJobRequest) {
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

RequestConverters.Params params = new RequestConverters.Params(request);
params.putParam("force", Boolean.toString(deleteJobRequest.isForce()));
if (deleteJobRequest.getForce() != null) {
params.putParam("force", Boolean.toString(deleteJobRequest.getForce()));
}
if (deleteJobRequest.getWaitForCompletion() != null) {
params.putParam("wait_for_completion", Boolean.toString(deleteJobRequest.getWaitForCompletion()));
}

return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
Expand Down Expand Up @@ -211,14 +212,15 @@ public void getJobStatsAsync(GetJobStatsRequest request, RequestOptions options,
*
* @param request The request to delete the job
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return action acknowledgement
* @return The action response which contains the acknowledgement or the task id depending on whether the action was set to wait for
* completion
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
public DeleteJobResponse deleteJob(DeleteJobRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
Collections.emptySet());
}

Expand All @@ -232,11 +234,11 @@ public AcknowledgedResponse deleteJob(DeleteJobRequest request, RequestOptions o
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<AcknowledgedResponse> listener) {
public void deleteJobAsync(DeleteJobRequest request, RequestOptions options, ActionListener<DeleteJobResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
MLRequestConverters::deleteJob,
options,
AcknowledgedResponse::fromXContent,
DeleteJobResponse::fromXContent,
listener,
Collections.emptySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
public class DeleteJobRequest extends ActionRequest {

private String jobId;
private boolean force;
private Boolean force;
private Boolean waitForCompletion;

public DeleteJobRequest(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
Expand All @@ -47,7 +48,7 @@ public void setJobId(String jobId) {
this.jobId = Objects.requireNonNull(jobId, "[job_id] must not be null");
}

public boolean isForce() {
public Boolean getForce() {
return force;
}

Expand All @@ -57,10 +58,24 @@ public boolean isForce() {
*
* @param force When {@code true} forcefully delete an opened job. Defaults to {@code false}
*/
public void setForce(boolean force) {
public void setForce(Boolean force) {
this.force = force;
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

/**
* Set whether this request should wait until the operation has completed before returning
* @param waitForCompletion When {@code true} the call will wait for the job deletion to complete.
* Otherwise, the deletion will be executed asynchronously and the response
* will contain the task id.
*/
public void setWaitForCompletion(Boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Objects;

/**
* Response object that contains the acknowledgement or the task id
* depending on whether the delete job action was requested to wait for completion.
*/
public class DeleteJobResponse extends ActionResponse implements ToXContentObject {

private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
private static final ParseField TASK = new ParseField("task");

public static final ConstructingObjectParser<DeleteJobResponse, Void> PARSER = new ConstructingObjectParser<>("delete_job_response",
true, a-> new DeleteJobResponse((Boolean) a[0], (TaskId) a[1]));

static {
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACKNOWLEDGED);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), TaskId.parser(), TASK, ObjectParser.ValueType.STRING);
}

public static DeleteJobResponse fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

private final Boolean acknowledged;
private final TaskId task;

DeleteJobResponse(@Nullable Boolean acknowledged, @Nullable TaskId task) {
assert acknowledged != null || task != null;
this.acknowledged = acknowledged;
this.task = task;
}

/**
* Get the action acknowledgement
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code false} or
* otherwise a {@code boolean} that indicates whether the job was deleted successfully.
*/
public Boolean getAcknowledged() {
return acknowledged;
}

/**
* Get the task id
* @return {@code null} when the request had {@link DeleteJobRequest#getWaitForCompletion()} set to {@code true} or
* otherwise the id of the job deletion task.
*/
public TaskId getTask() {
return task;
}

@Override
public int hashCode() {
return Objects.hash(acknowledged, task);
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

DeleteJobResponse that = (DeleteJobResponse) other;
return Objects.equals(acknowledged, that.acknowledged) && Objects.equals(task, that.task);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (acknowledged != null) {
builder.field(ACKNOWLEDGED.getPreferredName(), acknowledged);
}
if (task != null) {
builder.field(TASK.getPreferredName(), task.toString());
}
builder.endObject();
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class Job implements ToXContentObject {
public static final ParseField RESULTS_RETENTION_DAYS = new ParseField("results_retention_days");
public static final ParseField MODEL_SNAPSHOT_ID = new ParseField("model_snapshot_id");
public static final ParseField RESULTS_INDEX_NAME = new ParseField("results_index_name");
public static final ParseField DELETING = new ParseField("deleting");

public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("job_details", true, Builder::new);

Expand Down Expand Up @@ -94,6 +95,7 @@ public class Job implements ToXContentObject {
PARSER.declareField(Builder::setCustomSettings, (p, c) -> p.map(), CUSTOM_SETTINGS, ValueType.OBJECT);
PARSER.declareStringOrNull(Builder::setModelSnapshotId, MODEL_SNAPSHOT_ID);
PARSER.declareString(Builder::setResultsIndexName, RESULTS_INDEX_NAME);
PARSER.declareBoolean(Builder::setDeleting, DELETING);
}

private final String jobId;
Expand All @@ -115,13 +117,14 @@ public class Job implements ToXContentObject {
private final Map<String, Object> customSettings;
private final String modelSnapshotId;
private final String resultsIndexName;
private final Boolean deleting;

private Job(String jobId, String jobType, List<String> groups, String description,
Date createTime, Date finishedTime, Long establishedModelMemory,
AnalysisConfig analysisConfig, AnalysisLimits analysisLimits, DataDescription dataDescription,
ModelPlotConfig modelPlotConfig, Long renormalizationWindowDays, TimeValue backgroundPersistInterval,
Long modelSnapshotRetentionDays, Long resultsRetentionDays, Map<String, Object> customSettings,
String modelSnapshotId, String resultsIndexName) {
String modelSnapshotId, String resultsIndexName, Boolean deleting) {

this.jobId = jobId;
this.jobType = jobType;
Expand All @@ -141,6 +144,7 @@ private Job(String jobId, String jobType, List<String> groups, String descriptio
this.customSettings = customSettings == null ? null : Collections.unmodifiableMap(customSettings);
this.modelSnapshotId = modelSnapshotId;
this.resultsIndexName = resultsIndexName;
this.deleting = deleting;
}

/**
Expand Down Expand Up @@ -275,6 +279,10 @@ public String getModelSnapshotId() {
return modelSnapshotId;
}

public Boolean getDeleting() {
return deleting;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -330,6 +338,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (resultsIndexName != null) {
builder.field(RESULTS_INDEX_NAME.getPreferredName(), resultsIndexName);
}
if (deleting != null) {
builder.field(DELETING.getPreferredName(), deleting);
}
builder.endObject();
return builder;
}
Expand Down Expand Up @@ -362,15 +373,16 @@ public boolean equals(Object other) {
&& Objects.equals(this.resultsRetentionDays, that.resultsRetentionDays)
&& Objects.equals(this.customSettings, that.customSettings)
&& Objects.equals(this.modelSnapshotId, that.modelSnapshotId)
&& Objects.equals(this.resultsIndexName, that.resultsIndexName);
&& Objects.equals(this.resultsIndexName, that.resultsIndexName)
&& Objects.equals(this.deleting, that.deleting);
}

@Override
public int hashCode() {
return Objects.hash(jobId, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}

@Override
Expand Down Expand Up @@ -402,6 +414,7 @@ public static class Builder {
private Map<String, Object> customSettings;
private String modelSnapshotId;
private String resultsIndexName;
private Boolean deleting;

private Builder() {
}
Expand Down Expand Up @@ -429,6 +442,7 @@ public Builder(Job job) {
this.customSettings = job.getCustomSettings();
this.modelSnapshotId = job.getModelSnapshotId();
this.resultsIndexName = job.getResultsIndexNameNoPrefix();
this.deleting = job.getDeleting();
}

public Builder setId(String id) {
Expand Down Expand Up @@ -525,6 +539,11 @@ public Builder setResultsIndexName(String resultsIndexName) {
return this;
}

Builder setDeleting(Boolean deleting) {
this.deleting = deleting;
return this;
}

/**
* Builds a job.
*
Expand All @@ -537,7 +556,7 @@ public Job build() {
id, jobType, groups, description, createTime, finishedTime, establishedModelMemory,
analysisConfig, analysisLimits, dataDescription, modelPlotConfig, renormalizationWindowDays,
backgroundPersistInterval, modelSnapshotRetentionDays, resultsRetentionDays, customSettings,
modelSnapshotId, resultsIndexName);
modelSnapshotId, resultsIndexName, deleting);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,18 @@ public void testDeleteJob() {
Request request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_xpack/ml/anomaly_detectors/" + jobId, request.getEndpoint());
assertEquals(Boolean.toString(false), request.getParameters().get("force"));
assertNull(request.getParameters().get("force"));
assertNull(request.getParameters().get("wait_for_completion"));

deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setForce(true);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(true), request.getParameters().get("force"));

deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);
request = MLRequestConverters.deleteJob(deleteJobRequest);
assertEquals(Boolean.toString(false), request.getParameters().get("wait_for_completion"));
}

public void testFlushJob() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.client.ml.DeleteDatafeedRequest;
import org.elasticsearch.client.ml.DeleteForecastRequest;
import org.elasticsearch.client.ml.DeleteJobRequest;
import org.elasticsearch.client.ml.DeleteJobResponse;
import org.elasticsearch.client.ml.FlushJobRequest;
import org.elasticsearch.client.ml.FlushJobResponse;
import org.elasticsearch.client.ml.ForecastJobRequest;
Expand Down Expand Up @@ -151,17 +152,33 @@ public void testGetJob() throws Exception {
assertThat(response.jobs().stream().map(Job::getId).collect(Collectors.toList()), hasItems(jobId1, jobId2));
}

public void testDeleteJob() throws Exception {
public void testDeleteJob_GivenWaitForCompletionIsTrue() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

AcknowledgedResponse response = execute(new DeleteJobRequest(jobId),
DeleteJobResponse response = execute(new DeleteJobRequest(jobId),
machineLearningClient::deleteJob,
machineLearningClient::deleteJobAsync);

assertTrue(response.isAcknowledged());
assertTrue(response.getAcknowledged());
assertNull(response.getTask());
}

public void testDeleteJob_GivenWaitForCompletionIsFalse() throws Exception {
String jobId = randomValidJobId();
Job job = buildJob(jobId);
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();
machineLearningClient.putJob(new PutJobRequest(job), RequestOptions.DEFAULT);

DeleteJobRequest deleteJobRequest = new DeleteJobRequest(jobId);
deleteJobRequest.setWaitForCompletion(false);

DeleteJobResponse response = execute(deleteJobRequest, machineLearningClient::deleteJob, machineLearningClient::deleteJobAsync);

assertNull(response.getAcknowledged());
assertNotNull(response.getTask());
}

public void testOpenJob() throws Exception {
Expand Down
Loading

0 comments on commit c8fa5b9

Please sign in to comment.