Skip to content

Commit

Permalink
Broadcast cancellation to only nodes have outstanding child tasks (el…
Browse files Browse the repository at this point in the history
…astic#54312)

Today when canceling a task we broadcast ban/unban requests to all nodes
in the cluster. This strategy does not scale well for hierarchical
cancellation. With this change, we will track outstanding child requests
and broadcast the cancellation to only nodes that have outstanding child
tasks. This change also prevents a parent task from sending child
requests once it got canceled.

Relates elastic#50990
Supersedes elastic#51157

Co-authored-by: Igor Motov <[email protected]>
Co-authored-by: Yannick Welsch <[email protected]>
  • Loading branch information
3 people committed Apr 6, 2020
1 parent b939b47 commit 98e65b1
Show file tree
Hide file tree
Showing 17 changed files with 934 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ static Request cancelTasks(CancelTasksRequest req) {
params
.withNodes(req.getNodes())
.withActions(req.getActions());
if (req.getWaitForCompletion() != null) {
params.withWaitForCompletion(req.getWaitForCompletion());
}
request.addParameters(params.asMap());
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class CancelTasksRequest implements Validatable {
private Optional<TimeValue> timeout = Optional.empty();
private Optional<TaskId> parentTaskId = Optional.empty();
private Optional<TaskId> taskId = Optional.empty();
private Boolean waitForCompletion;

CancelTasksRequest(){}

Expand Down Expand Up @@ -76,6 +77,14 @@ public Optional<TaskId> getTaskId() {
return taskId;
}

public Boolean getWaitForCompletion() {
return waitForCompletion;
}

public void setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -85,12 +94,13 @@ public boolean equals(Object o) {
Objects.equals(getActions(), that.getActions()) &&
Objects.equals(getTimeout(), that.getTimeout()) &&
Objects.equals(getParentTaskId(), that.getParentTaskId()) &&
Objects.equals(getTaskId(), that.getTaskId()) ;
Objects.equals(getTaskId(), that.getTaskId()) &&
Objects.equals(waitForCompletion, that.waitForCompletion);
}

@Override
public int hashCode() {
return Objects.hash(getNodes(), getActions(), getTimeout(), getParentTaskId(), getTaskId());
return Objects.hash(getNodes(), getActions(), getTimeout(), getParentTaskId(), getTaskId(), waitForCompletion);
}

@Override
Expand All @@ -101,6 +111,7 @@ public String toString() {
", timeout=" + timeout +
", parentTaskId=" + parentTaskId +
", taskId=" + taskId +
", waitForCompletion=" + waitForCompletion +
'}';
}

Expand All @@ -110,6 +121,7 @@ public static class Builder {
private Optional<TaskId> parentTaskId = Optional.empty();
private List<String> actionsFilter = new ArrayList<>();
private List<String> nodesFilter = new ArrayList<>();
private Boolean waitForCompletion;

public Builder withTimeout(TimeValue timeout){
this.timeout = Optional.of(timeout);
Expand Down Expand Up @@ -138,13 +150,21 @@ public Builder withNodesFiltered(List<String> nodes){
return this;
}

public Builder withWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

public CancelTasksRequest build() {
CancelTasksRequest request = new CancelTasksRequest();
timeout.ifPresent(request::setTimeout);
taskId.ifPresent(request::setTaskId);
parentTaskId.ifPresent(request::setParentTaskId);
request.setNodes(nodesFilter);
request.setActions(actionsFilter);
if (waitForCompletion != null) {
request.setWaitForCompletion(waitForCompletion);
}
return request;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.tasks.CancelTasksRequest;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;

Expand All @@ -40,14 +41,15 @@ public void testCancelTasks() {
new org.elasticsearch.client.tasks.TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
org.elasticsearch.client.tasks.TaskId parentTaskId =
new org.elasticsearch.client.tasks.TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
org.elasticsearch.client.tasks.CancelTasksRequest request =
new org.elasticsearch.client.tasks.CancelTasksRequest.Builder()
.withTaskId(taskId)
.withParentTaskId(parentTaskId)
.build();
CancelTasksRequest.Builder builder = new CancelTasksRequest.Builder().withTaskId(taskId).withParentTaskId(parentTaskId);
expectedParams.put("task_id", taskId.toString());
expectedParams.put("parent_task_id", parentTaskId.toString());
Request httpRequest = TasksRequestConverters.cancelTasks(request);
if (randomBoolean()) {
boolean waitForCompletion = randomBoolean();
builder.withWaitForCompletion(waitForCompletion);
expectedParams.put("wait_for_completion", Boolean.toString(waitForCompletion));
}
Request httpRequest = TasksRequestConverters.cancelTasks(builder.build());
assertThat(httpRequest, notNullValue());
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
assertThat(httpRequest.getEntity(), nullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public void testCancelTasks() throws IOException {
// tag::cancel-tasks-request-filter
CancelTasksRequest byTaskIdRequest = new org.elasticsearch.client.tasks.CancelTasksRequest.Builder() // <1>
.withTaskId(new org.elasticsearch.client.tasks.TaskId("myNode",44L)) // <2>
.build(); // <3>
.withWaitForCompletion(true) // <3>
.build(); // <4>
// end::cancel-tasks-request-filter

}
Expand Down
4 changes: 3 additions & 1 deletion docs/java-rest/high-level/tasks/cancel_tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request
--------------------------------------------------
<1> Cancel a task
<2> Cancel only cluster-related tasks
<3> Cancel all tasks running on nodes nodeId1 and nodeId2
<3> Should the request block until the cancellation of the task and its child tasks is completed.
Otherwise, the request can return soon after the cancellation is started. Defaults to `false`.
<4> Cancel all tasks running on nodes nodeId1 and nodeId2

==== Synchronous Execution

Expand Down
5 changes: 5 additions & 0 deletions docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ list tasks command, so multiple tasks can be cancelled at the same time. For
example, the following command will cancel all reindex tasks running on the
nodes `nodeId1` and `nodeId2`.

`wait_for_completion`::
(Optional, boolean) If `true`, the request blocks until the cancellation of the
task and its child tasks is completed. Otherwise, the request can return soon
after the cancellation is started. Defaults to `false`.

[source,console]
--------------------------------------------------
POST _tasks/_cancel?nodes=nodeId1,nodeId2&actions=*reindex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"parent_task_id":{
"type":"string",
"description":"Cancel tasks with specified parent task id (node_id:task_number). Set to -1 to cancel all."
},
"wait_for_completion": {
"type":"boolean",
"description":"Should the request block until the cancellation of the task and its child tasks is completed. Defaults to false"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.cluster.node.tasks.cancel;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -33,20 +34,28 @@
public class CancelTasksRequest extends BaseTasksRequest<CancelTasksRequest> {

public static final String DEFAULT_REASON = "by user request";
public static final boolean DEFAULT_WAIT_FOR_COMPLETION = false;

private String reason = DEFAULT_REASON;
private boolean waitForCompletion = DEFAULT_WAIT_FOR_COMPLETION;

public CancelTasksRequest() {}

public CancelTasksRequest(StreamInput in) throws IOException {
super(in);
this.reason = in.readString();
if (in.getVersion().onOrAfter(Version.V_7_8_0)) {
waitForCompletion = in.readBoolean();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(reason);
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeBoolean(waitForCompletion);
}
}

@Override
Expand All @@ -68,4 +77,16 @@ public CancelTasksRequest setReason(String reason) {
public String getReason() {
return reason;
}

/**
* If {@code true}, the request blocks until the cancellation of the task and its child tasks is completed.
* Otherwise, the request can return soon after the cancellation is started. Defaults to {@code false}.
*/
public void setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
}

public boolean waitForCompletion() {
return waitForCompletion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public CancelTasksRequestBuilder(ElasticsearchClient client, CancelTasksAction a
super(client, action, new CancelTasksRequest());
}

public CancelTasksRequestBuilder waitForCompletion(boolean waitForCompletion) {
request.setWaitForCompletion(waitForCompletion);
return this;
}
}
Loading

0 comments on commit 98e65b1

Please sign in to comment.