Skip to content

Commit

Permalink
Chunked encoding for tasks APIs (#91935)
Browse files Browse the repository at this point in the history
This response can reach many MiB in size in a large and busy cluster,
let's use chunking here.

Relates #89838
  • Loading branch information
DaveCTurner authored Nov 28, 2022
1 parent c1a4cde commit 4095d65
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
Expand Down Expand Up @@ -43,17 +41,7 @@ public CancelTasksResponse(
super(tasks, taskFailures, nodeFailures);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return super.toXContent(builder, params);
}

public static CancelTasksResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Returns the list of tasks currently running on the nodes
*/
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
public class ListTasksResponse extends BaseTasksResponse {
private static final String TASKS = "tasks";

private final List<TaskInfo> tasks;
Expand Down Expand Up @@ -142,7 +145,7 @@ private void buildTaskGroups() {
topLevelTasks.add(taskGroup);
}
}
this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).toList());
this.groups = topLevelTasks.stream().map(TaskGroup.Builder::build).toList();
}

/**
Expand All @@ -155,82 +158,98 @@ public List<TaskInfo> getTasks() {
/**
* Convert this task response to XContent grouping by executing nodes.
*/
public XContentBuilder toXContentGroupedByNode(XContentBuilder builder, Params params, DiscoveryNodes discoveryNodes)
throws IOException {
toXContentCommon(builder, params);
builder.startObject("nodes");
for (Map.Entry<String, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip it's useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : node.getRoles()) {
builder.value(role.roleName());
}
builder.endArray();
public ChunkedToXContent groupedByNode(Supplier<DiscoveryNodes> nodesInCluster) {
return ignored -> {
final var discoveryNodes = nodesInCluster.get();
return Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startObject("nodes");
return builder;
}), getPerNodeTasks().entrySet().stream().flatMap(entry -> {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
return Stream.<Stream<ToXContent>>of(Stream.of((builder, params) -> {
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip its useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : node.getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (node.getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
if (node.getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
}
builder.startObject(TASKS);
return builder;
}), entry.getValue().stream().<ToXContent>map(task -> (builder, params) -> {
builder.startObject(task.taskId().toString());
task.toXContent(builder, params);
builder.endObject();
}
}
builder.startObject(TASKS);
for (TaskInfo task : entry.getValue()) {
builder.startObject(task.taskId().toString());
task.toXContent(builder, params);
return builder;
}), Stream.of((builder, params) -> {
builder.endObject();
builder.endObject();
return builder;
})).flatMap(Function.identity());
}).iterator(), Iterators.single((builder, params) -> {
builder.endObject();
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
builder.endObject();
return builder;
}));
};
}

/**
* Convert this response to XContent grouping by parent tasks.
*/
public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startObject(TASKS);
for (TaskGroup group : getTaskGroups()) {
public ChunkedToXContent groupedByParent() {
return ignored -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startObject(TASKS);
return builder;
}), getTaskGroups().stream().<ToXContent>map(group -> (builder, params) -> {
builder.field(group.taskInfo().taskId().toString());
group.toXContent(builder, params);
}
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, params) -> {
builder.endObject();
builder.endObject();
return builder;
}));
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray(TASKS);
for (TaskInfo taskInfo : getTasks()) {
public ChunkedToXContent groupedByNone() {
return ignored -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startArray(TASKS);
return builder;
}), getTasks().stream().<ToXContent>map(taskInfo -> (builder, params) -> {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentGroupedByNone(builder, params);
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, params) -> {
builder.endArray();
builder.endObject();
return builder;
}));
}

public static ListTasksResponse fromXContent(XContentParser parser) {
Expand All @@ -239,6 +258,7 @@ public static ListTasksResponse fromXContent(XContentParser parser) {

@Override
public String toString() {
return Strings.toString(this, true, true);
return Strings.toString(ChunkedToXContent.wrapAsXContentObject(groupedByNone()), true, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -80,33 +76,17 @@ public static ListTasksRequest generateListTasksRequest(RestRequest request) {
public static <T extends ListTasksResponse> ActionListener<T> listTasksResponseListener(
Supplier<DiscoveryNodes> nodesInCluster,
String groupBy,
final RestChannel channel
RestChannel channel
) {
if ("nodes".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByNode(builder, channel.request(), nodesInCluster.get());
builder.endObject();
return new RestResponse(RestStatus.OK, builder);
}
};
} else if ("parents".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByParents(builder, channel.request());
builder.endObject();
return new RestResponse(RestStatus.OK, builder);
}
};
} else if ("none".equals(groupBy)) {
return new RestToXContentListener<>(channel);
} else {
throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]");
}
final var listener = new RestChunkedToXContentListener<>(channel);
return switch (groupBy) {
case "nodes" -> listener.map(response -> response.groupedByNode(nodesInCluster));
case "parents" -> listener.map(response -> response.groupedByParent());
case "none" -> listener.map(response -> response.groupedByNone());
default -> throw new IllegalArgumentException(
"[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]"
);
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -889,14 +890,12 @@ public void testTasksToXContentGrouping() throws Exception {

private Map<String, Object> serialize(ListTasksResponse response, boolean byParents) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
if (byParents) {
DiscoveryNodes nodes = testNodes[0].clusterService.state().nodes();
response.toXContentGroupedByNode(builder, ToXContent.EMPTY_PARAMS, nodes);
ChunkedToXContent.wrapAsXContentObject(response.groupedByNode(() -> nodes)).toXContent(builder, ToXContent.EMPTY_PARAMS);
} else {
response.toXContentGroupedByParents(builder, ToXContent.EMPTY_PARAMS);
ChunkedToXContent.wrapAsXContentObject(response.groupedByParent()).toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
builder.flush();
logger.info(Strings.toString(builder));
return XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
Expand Down
Loading

0 comments on commit 4095d65

Please sign in to comment.