Skip to content

Commit

Permalink
Chunk SingleNodeShutdownStatus and ShutdownShardMigrationStatus (and …
Browse files Browse the repository at this point in the history
…related action) response (elastic#99798)

Moving SingleNodeShutdownStatus and ShutdownShardMigrationStatus to chunked response
  • Loading branch information
ldematte authored Sep 25, 2023
1 parent ccdd85f commit db985e4
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 52 deletions.
7 changes: 7 additions & 0 deletions docs/changelog/99798.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 99798
summary: Chunk `SingleNodeShutdownStatus` and `ShutdownShardMigrationStatus` (and
related action) response
area: Infra/Node Lifecycle
type: enhancement
issues:
- 99678
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,25 @@
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

public class ShutdownShardMigrationStatus implements Writeable, ToXContentObject {
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.singleChunk;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject;

public class ShutdownShardMigrationStatus implements Writeable, ChunkedToXContentObject {
private static final TransportVersion ALLOCATION_DECISION_ADDED_VERSION = TransportVersions.V_7_16_0;

public static final String NODE_ALLOCATION_DECISION_KEY = "node_allocation_decision";
Expand Down Expand Up @@ -79,22 +86,23 @@ public SingleNodeShutdownMetadata.Status getStatus() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
startObject(),
singleChunk((builder, p) -> buildHeader(builder)),
Objects.nonNull(allocationDecision)
? Iterators.concat(startObject(NODE_ALLOCATION_DECISION_KEY), allocationDecision.toXContentChunked(params), endObject())
: Collections.emptyIterator(),
endObject()
);
}

private XContentBuilder buildHeader(XContentBuilder builder) throws IOException {
builder.field("status", status);
builder.field("shard_migrations_remaining", shardsRemaining);
if (Objects.nonNull(explanation)) {
builder.field("explanation", explanation);
}
if (Objects.nonNull(allocationDecision)) {
builder.startObject(NODE_ALLOCATION_DECISION_KEY);
{
// This field might be huge, TODO add chunking support here
ChunkedToXContent.wrapAsToXContent(allocationDecision).toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
return builder;
}

Expand Down Expand Up @@ -126,6 +134,6 @@ public int hashCode() {

@Override
public String toString() {
return Strings.toString(this);
return Strings.toString((b, p) -> buildHeader(b), false, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ public static Iterator<ToXContent> field(String name, String value) {
return Iterators.single(((builder, params) -> builder.field(name, value)));
}

/**
* Creates an Iterator to serialize a named field where the value is represented by a chunked ToXContext.
* Chunked equivalent for {@code XContentBuilder field(String name, ToXContent value)}
* @param name name of the field
* @param value ChunkedToXContent value for this field (single value, object or array)
* @param params ToXContent params to propagate for XContent serialization
* @return Iterator composing field name and value serialization
*/
public static Iterator<ToXContent> field(String name, ChunkedToXContentObject value, ToXContent.Params params) {
return Iterators.concat(Iterators.single((builder, innerParam) -> builder.field(name)), value.toXContentChunked(params));
}

public static Iterator<ToXContent> array(String name, Iterator<? extends ToXContent> contents) {
return Iterators.concat(ChunkedToXContentHelper.startArray(name), contents, ChunkedToXContentHelper.endArray());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.xcontent;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;

import java.util.Iterator;
import java.util.function.IntFunction;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.hamcrest.Matchers.equalTo;

public class ChunkedToXContentHelperTests extends ESTestCase {

public void testFieldWithInnerChunkedObject() {

ToXContent innerXContent = (builder, p) -> {
builder.startObject();
builder.field("field1", 10);
builder.field("field2", "aaa");
builder.endObject();
return builder;
};

ToXContent outerXContent = (builder, p) -> {
builder.field("field3", 10);
builder.field("field4", innerXContent);
return builder;
};

var expectedContent = Strings.toString(outerXContent);

ChunkedToXContentObject innerChunkedContent = params -> Iterators.concat(
ChunkedToXContentHelper.startObject(),
ChunkedToXContentHelper.field("field1", 10),
ChunkedToXContentHelper.field("field2", "aaa"),
ChunkedToXContentHelper.endObject()
);

ChunkedToXContent outerChunkedContent = params -> Iterators.concat(
ChunkedToXContentHelper.field("field3", 10),
ChunkedToXContentHelper.field("field4", innerChunkedContent, EMPTY_PARAMS)
);

assertThat(Strings.toString(outerChunkedContent), equalTo(expectedContent));
}

public void testFieldWithInnerChunkedArray() {

ToXContent innerXContent = (builder, p) -> {
builder.startArray();
builder.value(10);
builder.value(20);
builder.endArray();
return builder;
};

ToXContent outerXContent = (builder, p) -> {
builder.field("field3", 10);
builder.field("field4", innerXContent);
return builder;
};

var expectedContent = Strings.toString(outerXContent);

IntFunction<Iterator<ToXContent>> value = v -> Iterators.single(((builder, p) -> builder.value(v)));

ChunkedToXContentObject innerChunkedContent = params -> Iterators.concat(
ChunkedToXContentHelper.startArray(),
value.apply(10),
value.apply(20),
ChunkedToXContentHelper.endArray()
);

ChunkedToXContent outerChunkedContent = params -> Iterators.concat(
ChunkedToXContentHelper.field("field3", 10),
ChunkedToXContentHelper.field("field4", innerChunkedContent, EMPTY_PARAMS)
);

assertThat(Strings.toString(outerChunkedContent), equalTo(expectedContent));
}

public void testFieldWithInnerChunkedField() {

ToXContent innerXContent = (builder, p) -> {
builder.value(10);
return builder;
};

ToXContent outerXContent = (builder, p) -> {
builder.field("field3", 10);
builder.field("field4", innerXContent);
return builder;
};

var expectedContent = Strings.toString(outerXContent);

IntFunction<Iterator<ToXContent>> value = v -> Iterators.single(((builder, p) -> builder.value(v)));

ChunkedToXContentObject innerChunkedContent = params -> Iterators.single((builder, p) -> builder.value(10));

ChunkedToXContent outerChunkedContent = params -> Iterators.concat(
ChunkedToXContentHelper.field("field3", 10),
ChunkedToXContentHelper.field("field4", innerChunkedContent, EMPTY_PARAMS)
);

assertThat(Strings.toString(outerChunkedContent), equalTo(expectedContent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -80,7 +83,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
}
}

public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ChunkedToXContentObject {
final List<SingleNodeShutdownStatus> shutdownStatuses;

public Response(List<SingleNodeShutdownStatus> shutdownStatuses) {
Expand All @@ -96,17 +99,14 @@ public List<SingleNodeShutdownStatus> getShutdownStatuses() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.startArray("nodes");
for (SingleNodeShutdownStatus nodeShutdownStatus : shutdownStatuses) {
nodeShutdownStatus.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
ChunkedToXContentHelper.startArray("nodes"),
Iterators.flatMap(shutdownStatuses.iterator(), status -> status.toXContentChunked(params)),
ChunkedToXContentHelper.endArray(),
ChunkedToXContentHelper.endObject()
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.util.List;

Expand All @@ -40,7 +40,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetShutdownStatusAction.INSTANCE,
new GetShutdownStatusAction.Request(nodeIds),
new RestToXContentListener<>(channel)
new RestChunkedToXContentListener<>(channel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@
import org.elasticsearch.cluster.metadata.ShutdownShardMigrationStatus;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;

public class SingleNodeShutdownStatus implements Writeable, ToXContentObject {
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.singleChunk;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject;

public class SingleNodeShutdownStatus implements Writeable, ChunkedToXContentObject {

private final SingleNodeShutdownMetadata metadata;
private final ShutdownShardMigrationStatus shardMigrationStatus;
Expand Down Expand Up @@ -108,9 +115,8 @@ public String toString() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(startObject(), singleChunk((builder, p) -> {
builder.field(SingleNodeShutdownMetadata.NODE_ID_FIELD.getPreferredName(), metadata.getNodeId());
builder.field(SingleNodeShutdownMetadata.TYPE_FIELD.getPreferredName(), metadata.getType());
builder.field(SingleNodeShutdownMetadata.REASON_FIELD.getPreferredName(), metadata.getReason());
Expand All @@ -126,20 +132,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
metadata.getStartedAtMillis()
);
builder.field(STATUS.getPreferredName(), overallStatus());
builder.field(SHARD_MIGRATION_FIELD.getPreferredName(), shardMigrationStatus);
builder.field(PERSISTENT_TASKS_FIELD.getPreferredName(), persistentTasksStatus);
builder.field(PLUGINS_STATUS.getPreferredName(), pluginsStatus);
if (metadata.getTargetNodeName() != null) {
builder.field(TARGET_NODE_NAME_FIELD.getPreferredName(), metadata.getTargetNodeName());
}
if (metadata.getGracePeriod() != null) {
builder.timeField(
SingleNodeShutdownMetadata.GRACE_PERIOD_FIELD.getPreferredName(),
metadata.getGracePeriod().getStringRep()
);
}
}
builder.endObject();
return builder;
return builder;
}),
ChunkedToXContentHelper.field(SHARD_MIGRATION_FIELD.getPreferredName(), shardMigrationStatus, params),
singleChunk((builder, p) -> {
builder.field(PERSISTENT_TASKS_FIELD.getPreferredName(), persistentTasksStatus);
builder.field(PLUGINS_STATUS.getPreferredName(), pluginsStatus);
if (metadata.getTargetNodeName() != null) {
builder.field(TARGET_NODE_NAME_FIELD.getPreferredName(), metadata.getTargetNodeName());
}
if (metadata.getGracePeriod() != null) {
builder.timeField(
SingleNodeShutdownMetadata.GRACE_PERIOD_FIELD.getPreferredName(),
metadata.getGracePeriod().getStringRep()
);
}
return builder;
}),
endObject()
);
}
}

0 comments on commit db985e4

Please sign in to comment.