Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast cancellation to only nodes have outstanding child tasks #54312

Merged
merged 25 commits into from
Apr 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ static Request cancelTasks(CancelTasksRequest req) {
params
.withNodes(req.getNodes())
.withActions(req.getActions());
if (req.getWaitForCompletion() != null) {
params.withWaitForCompletion(req.getWaitForCompletion());
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class CancelTasksRequest implements Validatable {
private Optional<TimeValue> timeout = Optional.empty();
private Optional<TaskId> parentTaskId = Optional.empty();
private Optional<TaskId> taskId = Optional.empty();
private Boolean waitForCompletion;

CancelTasksRequest(){}

Expand Down Expand Up @@ -76,6 +77,14 @@ public Optional<TaskId> getTaskId() {
return taskId;
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

public void setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -85,12 +94,13 @@ public boolean equals(Object o) {
Objects.equals(getActions(), that.getActions()) &&
Objects.equals(getTimeout(), that.getTimeout()) &&
Objects.equals(getParentTaskId(), that.getParentTaskId()) &&
Objects.equals(getTaskId(), that.getTaskId()) ;
Objects.equals(getTaskId(), that.getTaskId()) &&
Objects.equals(waitForCompletion, that.waitForCompletion);
}

@Override
public int hashCode() {
return Objects.hash(getNodes(), getActions(), getTimeout(), getParentTaskId(), getTaskId());
return Objects.hash(getNodes(), getActions(), getTimeout(), getParentTaskId(), getTaskId(), waitForCompletion);
}

@Override
Expand All @@ -101,6 +111,7 @@ public String toString() {
", timeout=" + timeout +
", parentTaskId=" + parentTaskId +
", taskId=" + taskId +
", waitForCompletion=" + waitForCompletion +
'}';
}

Expand All @@ -110,6 +121,7 @@ public static class Builder {
private Optional<TaskId> parentTaskId = Optional.empty();
private List<String> actionsFilter = new ArrayList<>();
private List<String> nodesFilter = new ArrayList<>();
private Boolean waitForCompletion;

public Builder withTimeout(TimeValue timeout){
this.timeout = Optional.of(timeout);
Expand Down Expand Up @@ -138,13 +150,21 @@ public Builder withNodesFiltered(List<String> nodes){
return this;
}

public Builder withWaitForCompletion(boolean waitForCompletion) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
this.waitForCompletion = waitForCompletion;
return this;
}

public CancelTasksRequest build() {
CancelTasksRequest request = new CancelTasksRequest();
timeout.ifPresent(request::setTimeout);
taskId.ifPresent(request::setTaskId);
parentTaskId.ifPresent(request::setParentTaskId);
request.setNodes(nodesFilter);
request.setActions(actionsFilter);
if (waitForCompletion != null) {
request.setWaitForCompletion(waitForCompletion);
}
return request;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.tasks.CancelTasksRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -40,14 +41,15 @@ public void testCancelTasks() {
new org.elasticsearch.client.tasks.TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
org.elasticsearch.client.tasks.TaskId parentTaskId =
new org.elasticsearch.client.tasks.TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
org.elasticsearch.client.tasks.CancelTasksRequest request =
new org.elasticsearch.client.tasks.CancelTasksRequest.Builder()
.withTaskId(taskId)
.withParentTaskId(parentTaskId)
.build();
CancelTasksRequest.Builder builder = new CancelTasksRequest.Builder().withTaskId(taskId).withParentTaskId(parentTaskId);
expectedParams.put("task_id", taskId.toString());
expectedParams.put("parent_task_id", parentTaskId.toString());
Request httpRequest = TasksRequestConverters.cancelTasks(request);
if (randomBoolean()) {
boolean waitForCompletion = randomBoolean();
builder.withWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", Boolean.toString(waitForCompletion));
}
Request httpRequest = TasksRequestConverters.cancelTasks(builder.build());
assertThat(httpRequest, notNullValue());
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(httpRequest.getEntity(), nullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public void testCancelTasks() throws IOException {
// tag::cancel-tasks-request-filter
CancelTasksRequest byTaskIdRequest = new org.elasticsearch.client.tasks.CancelTasksRequest.Builder() // <1>
.withTaskId(new org.elasticsearch.client.tasks.TaskId("myNode",44L)) // <2>
.build(); // <3>
.withWaitForCompletion(true) // <3>
.build(); // <4>
// end::cancel-tasks-request-filter

}
Expand Down
4 changes: 3 additions & 1 deletion docs/java-rest/high-level/tasks/cancel_tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request
--------------------------------------------------
<1> Cancel a task
<2> Cancel only cluster-related tasks
<3> Cancel all tasks running on nodes nodeId1 and nodeId2
<3> Should the request block until the cancellation of the task and its child tasks is completed.
Otherwise, the request can return soon after the cancellation is started. Defaults to `false`.
<4> Cancel all tasks running on nodes nodeId1 and nodeId2

==== Synchronous Execution

Expand Down
5 changes: 5 additions & 0 deletions docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ list tasks command, so multiple tasks can be cancelled at the same time. For
example, the following command will cancel all reindex tasks running on the
nodes `nodeId1` and `nodeId2`.

`wait_for_completion`::
(Optional, boolean) If `true`, the request blocks until the cancellation of the
task and its child tasks is completed. Otherwise, the request can return soon
after the cancellation is started. Defaults to `false`.

[source,console]
--------------------------------------------------
POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"parent_task_id":{
"type":"string",
"description":"Cancel tasks with specified parent task id (node_id:task_number). Set to -1 to cancel all."
},
"wait_for_completion": {
"type":"boolean",
"description":"Should the request block until the cancellation of the task and its child tasks is completed. Defaults to false"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.node.tasks.cancel;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,20 +34,28 @@
public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {

public static final String DEFAULT_REASON = "by user request";
public static final boolean DEFAULT_WAIT_FOR_COMPLETION = false;

private String reason = DEFAULT_REASON;
private boolean waitForCompletion = DEFAULT_WAIT_FOR_COMPLETION;

public CancelTasksRequest() {}

public CancelTasksRequest(StreamInput in) throws IOException {
super(in);
this.reason = in.readString();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
waitForCompletion = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(reason);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(waitForCompletion);
}
}

@Override
Expand All @@ -68,4 +77,16 @@ public CancelTasksRequest setReason(String reason) {
public String getReason() {
return reason;
}

/**
* If {@code true}, the request blocks until the cancellation of the task and its child tasks is completed.
* Otherwise, the request can return soon after the cancellation is started. Defaults to {@code false}.
*/
public void setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

public boolean waitForCompletion() {
return waitForCompletion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public CancelTasksRequestBuilder(ElasticsearchClient client, CancelTasksAction a
super(client, action, new CancelTasksRequest());
}

public CancelTasksRequestBuilder waitForCompletion(boolean waitForCompletion) {
request.setWaitForCompletion(waitForCompletion);
return this;
}
}
Loading