Skip to content

Commit

Permalink
Merge branch 'master' into ccr
Browse files Browse the repository at this point in the history
* master:
  Fix lock accounting in releasable lock
  Add ability to associate an ID with tasks  (#27764)
  [DOCS] Removed differencies between text and code (#27993)
  text fixes (#28136)
  Update getting-started.asciidoc (#28145)
  [Docs] Spelling fix in painless-getting-started.asciidoc (#28187)
  Fixed the cat.health REST test to accept 4ms, not just 4.0ms (#28186)
  Do not keep 5.x commits once having 6.x commits (#28188)
  • Loading branch information
jasontedor committed Jan 12, 2018
2 parents 8f3e9d6 + a15ba75 commit 1e330f7
Show file tree
Hide file tree
Showing 93 changed files with 750 additions and 199 deletions.
2 changes: 1 addition & 1 deletion docs/painless/painless-getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ POST hockey/player/_update_by_query
----------------------------------------------------------------
// CONSOLE

Using the match operator (`==~`) you can update all the hockey players who's
Using the match operator (`==~`) you can update all the hockey players whose
names start with a consonant and end with a vowel:

[source,js]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ A `single-value` metrics aggregation that calculates an approximate count of
distinct values. Values can be extracted either from specific fields in the
document or generated by a script.

Assume you are indexing books and would like to count the unique authors that
match a query:
Assume you are indexing store sales and would like to count the unique number of sold products that match a query:

[source,js]
--------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ in the parent multi-bucket aggregation. The specified metric must be numeric and
If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false`
and all other values will evaluate to true.

NOTE: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that
NOTE: The bucket_selector aggregation, like all pipeline aggregations, executes after all other sibling aggregations. This means that
using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations.

==== Syntax
Expand Down
68 changes: 68 additions & 0 deletions docs/reference/cluster/tasks.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -195,3 +195,71 @@ The following command will change the grouping to parent tasks:
GET _tasks?group_by=parents
--------------------------------------------------
// CONSOLE

The grouping can be disabled by specifying `none` as a `group_by` parameter:

[source,js]
--------------------------------------------------
GET _tasks?group_by=none
--------------------------------------------------
// CONSOLE

[float]
=== Identifying running tasks

The `X-Opaque-Id` header, when provided on the HTTP request header, is going to be returned as a header in the response as well as
in the `headers` field for in the task information. This allows to track certain calls, or associate certain tasks with
a the client that started them:

[source,sh]
--------------------------------------------------
curl -i -H "X-Opaque-Id: 123456" "http://localhost:9200/_tasks?group_by=parents"
--------------------------------------------------
// NOTCONSOLE

The result will look similar to the following:

[source,js]
--------------------------------------------------
HTTP/1.1 200 OK
X-Opaque-Id: 123456 <1>
content-type: application/json; charset=UTF-8
content-length: 831
{
"tasks" : {
"u5lcZHqcQhu-rUoFaqDphA:45" : {
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 45,
"type" : "transport",
"action" : "cluster:monitor/tasks/lists",
"start_time_in_millis" : 1513823752749,
"running_time_in_nanos" : 293139,
"cancellable" : false,
"headers" : {
"X-Opaque-Id" : "123456" <2>
},
"children" : [
{
"node" : "u5lcZHqcQhu-rUoFaqDphA",
"id" : 46,
"type" : "direct",
"action" : "cluster:monitor/tasks/lists[n]",
"start_time_in_millis" : 1513823752750,
"running_time_in_nanos" : 92133,
"cancellable" : false,
"parent_task_id" : "u5lcZHqcQhu-rUoFaqDphA:45",
"headers" : {
"X-Opaque-Id" : "123456" <3>
}
}
]
}
}
}
--------------------------------------------------
// NOTCONSOLE

<1> id as a part of the response header
<2> id for the tasks that was initiated by the REST request
<3> the child task of the task initiated by the REST request
4 changes: 2 additions & 2 deletions docs/reference/getting-started.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,8 @@ You can download the sample dataset (accounts.json) from https://github.com/elas

[source,sh]
--------------------------------------------------
curl -H "Content-Type: application/json" -XPOST 'localhost:9200/bank/account/_bulk?pretty&refresh' --data-binary "@accounts.json"
curl 'localhost:9200/_cat/indices?v'
curl -H "Content-Type: application/json" -XPOST "localhost:9200/bank/account/_bulk?pretty&refresh" --data-binary "@accounts.json"
curl "localhost:9200/_cat/indices?v"
--------------------------------------------------
// NOTCONSOLE

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
private SearchRequest firstSearchRequest;
private PlainActionFuture<BulkByScrollResponse> listener;
private String scrollId;
private ThreadPool threadPool;
private TaskManager taskManager;
private BulkByScrollTask testTask;
private WorkerBulkByScrollTaskState worker;
Expand All @@ -141,7 +143,8 @@ public void setupForTest() {
testRequest = new DummyAbstractBulkByScrollRequest(firstSearchRequest);
listener = new PlainActionFuture<>();
scrollId = null;
taskManager = new TaskManager(Settings.EMPTY);
threadPool = new TestThreadPool(getClass().getName());
taskManager = new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet());
testTask = (BulkByScrollTask) taskManager.register("don'tcare", "hereeither", testRequest);
testTask.setWorker(testRequest.getRequestsPerSecond(), null);
worker = testTask.getWorkerState();
Expand All @@ -159,8 +162,9 @@ private void setupClient(ThreadPool threadPool) {
}

@After
public void tearDownAndVerifyCommonStuff() {
public void tearDownAndVerifyCommonStuff() throws Exception {
client.close();
terminate(threadPool);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.mockito.ArgumentCaptor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -53,7 +54,7 @@ public class TransportRethrottleActionTests extends ESTestCase {
@Before
public void createTask() {
slices = between(2, 50);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID);
task = new BulkByScrollTask(1, "test_type", "test_action", "test", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
task.setWorkerCount(slices);
}

Expand Down Expand Up @@ -101,7 +102,8 @@ public void testRethrottleSuccessfulResponse() {
List<BulkByScrollTask.StatusOrException> sliceStatuses = new ArrayList<>(slices);
for (int i = 0; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices,
Expand All @@ -121,7 +123,8 @@ public void testRethrottleWithSomeSucceeded() {
List<TaskInfo> tasks = new ArrayList<>();
for (int i = succeeded; i < slices; i++) {
BulkByScrollTask.Status status = believeableInProgressStatus(i);
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId())));
tasks.add(new TaskInfo(new TaskId("test", 123), "test", "test", "test", status, 0, 0, true, new TaskId("test", task.getId()),
Collections.emptyMap()));
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
}
rethrottleTestCase(slices - succeeded,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected Version getCurrentVersion() {
}
};
MockTransportService mockTransportService =
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings);
MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet());
mockTransportService.start();
return mockTransportService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,44 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Create a task result record in the old cluster":
- do:
indices.create:
index: reindexed_index
body:
settings:
index:
number_of_replicas: 0
- do:
bulk:
refresh: true
body:
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "1"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "2"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "3"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "4"}'
- '{"index": {"_index": "reindexed_index", "_type": "doc"}}'
- '{"f1": "5"}'

- do:
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index
size: 1
dest:
index: reindexed_index_copy
- match: {task: '/.+:\d+/'}
- set: {task: task}

- do:
tasks.get:
wait_for_completion: true
task_id: $task
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,42 @@
field3: value
- match: { hits.total: 1 }
- match: { hits.hits.0._id: q3 }

---
"Find a task result record from the old cluster":
- do:
search:
index: .tasks
body:
query:
match_all: {}
- match: { hits.total: 1 }
- match: { hits.hits.0._id: '/.+:\d+/' }
- set: {hits.hits.0._id: task_id}

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id

- is_false: node_failures
- is_true: task

- do:
headers: { "X-Opaque-Id": "Reindexing Again" }
reindex:
wait_for_completion: false
body:
source:
index: reindexed_index_copy
size: 1
dest:
index: reindexed_index_another_copy
- match: { task: '/.+:\d+/' }
- set: { task: task_id }

- do:
tasks.get:
wait_for_completion: true
task_id: $task_id
- match: { task.headers.X-Opaque-Id: "Reindexing Again" }
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"group_by": {
"type" : "enum",
"description": "Group tasks by nodes or parent/child relationships",
"options" : ["nodes", "parents"],
"options" : ["nodes", "parents", "none"],
"default" : "nodes"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
\d+ \s+ # init
\d+ \s+ # unassign
\d+ \s+ # pending_tasks
(-|\d+[.]\d+ms|s) \s+ # max task waiting time
(-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time
\d+\.\d+% # active shards percent
\n
)+
Expand All @@ -72,7 +72,7 @@
\d+ \s+ # init
\d+ \s+ # unassign
\d+ \s+ # pending_tasks
(-|\d+[.]\d+ms|s) \s+ # max task waiting time
(-|\d+(?:[.]\d+)?m?s) \s+ # max task waiting time
\d+\.\d+% # active shards percent
\n
)+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,19 @@
group_by: parents

- is_true: tasks

---
"tasks_list headers":
- skip:
version: " - 6.99.99"
reason: task headers has been added in 7.0.0

- do:
headers: { "X-Opaque-Id": "That is me" }
tasks.list:
actions: "cluster:monitor/tasks/lists"
group_by: none

- is_true: tasks
- match: { tasks.0.headers.X-Opaque-Id: "That is me" }

Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;

Expand All @@ -324,6 +325,7 @@
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;

Expand Down Expand Up @@ -362,7 +364,10 @@ public ActionModule(boolean transportClient, Settings settings, IndexNameExpress
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet());
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of("X-Opaque-Id")
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* Transport action that can be used to cancel currently running cancellable tasks.
* <p>
* For a task to be cancellable it has to return an instance of
* {@link CancellableTask} from {@link TransportRequest#createTask(long, String, String, TaskId)}
* {@link CancellableTask} from {@link TransportRequest#createTask}
*/
public class TransportCancelTasksAction extends TransportTasksAction<CancellableTask, CancelTasksRequest, CancelTasksResponse, TaskInfo> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Param
return builder;
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray("tasks");
for (TaskInfo taskInfo : getTasks()) {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Loading

0 comments on commit 1e330f7

Please sign in to comment.