diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java index 76281e3f0b867..c3e93b418d86c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestCcrStatsAction.java @@ -7,10 +7,14 @@ package org.elasticsearch.xpack.ccr.rest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import java.util.List; @@ -19,6 +23,8 @@ public class RestCcrStatsAction extends BaseRestHandler { + private static final Logger logger = LogManager.getLogger(RestCcrStatsAction.class); + @Override public List routes() { return List.of(new Route(GET, "/_ccr/stats")); @@ -32,7 +38,17 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { final CcrStatsAction.Request request = new CcrStatsAction.Request(); - return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute( + CcrStatsAction.INSTANCE, + request, + new ThreadedActionListener<>( + logger, + client.threadPool(), + Ccr.CCR_THREAD_POOL_NAME, + new RestChunkedToXContentListener<>(channel), + false + ) + ); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java index 555b05edd54e9..6607b532c56e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java @@ -11,7 +11,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import java.util.List; @@ -34,7 +34,7 @@ public String getName() { protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { final FollowInfoAction.Request request = new FollowInfoAction.Request(); request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index"))); - return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestChunkedToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java index 38fbb0a1c060c..61c93bf6ac42f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowStatsAction.java @@ -7,11 +7,15 @@ package org.elasticsearch.xpack.ccr.rest; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import java.util.List; @@ -20,6 +24,8 @@ public class RestFollowStatsAction extends BaseRestHandler { + private static final Logger logger = LogManager.getLogger(RestFollowStatsAction.class); + @Override public List routes() { return List.of(new Route(GET, "/{index}/_ccr/stats")); @@ -34,7 +40,17 @@ public String getName() { protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest(); request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index"))); - return channel -> client.execute(FollowStatsAction.INSTANCE, request, new RestToXContentListener<>(channel)); + return channel -> client.execute( + FollowStatsAction.INSTANCE, + request, + new ThreadedActionListener<>( + logger, + client.threadPool(), + Ccr.CCR_THREAD_POOL_NAME, + new RestChunkedToXContentListener<>(channel), + false + ) + ); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java index 31cc73cb70226..ca786a64d97d2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowStatsResponseTests.java @@ -8,10 +8,15 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import java.io.IOException; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions; import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters; import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse; @@ -35,4 +40,19 @@ protected CcrStatsAction.Response createTestInstance() { FollowStatsAction.StatsResponses statsResponse = createStatsResponse(); return new CcrStatsAction.Response(autoFollowStats, statsResponse); } + + public void testChunking() throws IOException { + final var instance = createTestInstance(); + int chunkCount = 0; + try (var builder = jsonBuilder()) { + final var iterator = instance.toXContentChunked(EMPTY_PARAMS); + while (iterator.hasNext()) { + iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS); + chunkCount += 1; + } + } // closing the builder verifies that the XContent is well-formed + + var indexCount = instance.getFollowStats().getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count(); + assertEquals(instance.getFollowStats().getStatsResponses().size() + indexCount * 2 + 4, chunkCount); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java index 0fd1b5b756289..afed72f7909d1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java @@ -7,8 +7,9 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractXContentSerializingTestCase; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.xcontent.ConstructingObjectParser; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; @@ -18,20 +19,23 @@ import java.util.ArrayList; import java.util.List; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD; import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; -public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase { +public class FollowInfoResponseTests extends AbstractChunkedSerializingTestCase { - static final ConstructingObjectParser INFO_PARSER = new ConstructingObjectParser<>("info_parser", args -> { - return new FollowerInfo( + static final ConstructingObjectParser INFO_PARSER = new ConstructingObjectParser<>( + "info_parser", + args -> new FollowerInfo( (String) args[0], (String) args[1], (String) args[2], Status.fromString((String) args[3]), (FollowParameters) args[4] - ); - }); + ) + ); static { INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD); @@ -48,7 +52,7 @@ public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase @SuppressWarnings("unchecked") static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "response", - args -> { return new FollowInfoAction.Response((List) args[0]); } + args -> new FollowInfoAction.Response((List) args[0]) ); static { @@ -56,7 +60,7 @@ public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase } @Override - protected FollowInfoAction.Response doParseInstance(XContentParser parser) throws IOException { + protected FollowInfoAction.Response doParseInstance(XContentParser parser) { return PARSER.apply(parser, null); } @@ -87,4 +91,18 @@ protected FollowInfoAction.Response createTestInstance() { } return new FollowInfoAction.Response(infos); } + + public void testChunking() throws IOException { + final var instance = createTestInstance(); + int chunkCount = 0; + try (var builder = jsonBuilder()) { + final var iterator = instance.toXContentChunked(EMPTY_PARAMS); + while (iterator.hasNext()) { + iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS); + chunkCount += 1; + } + } // closing the builder verifies that the XContent is well-formed + + assertEquals(instance.getFollowInfos().size() + 2, chunkCount); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java index 10759072b6149..6cc3fb4dbe326 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java @@ -9,13 +9,18 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; + public class StatsResponsesTests extends AbstractWireSerializingTestCase { @Override @@ -67,4 +72,19 @@ static FollowStatsAction.StatsResponses createStatsResponse() { } return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses); } + + public void testChunking() throws IOException { + final var instance = createTestInstance(); + int chunkCount = 0; + try (var builder = jsonBuilder()) { + final var iterator = instance.toXContentChunked(EMPTY_PARAMS); + while (iterator.hasNext()) { + iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS); + chunkCount += 1; + } + } // closing the builder verifies that the XContent is well-formed + + var indexCount = instance.getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count(); + assertEquals(instance.getStatsResponses().size() + indexCount * 2 + 2, chunkCount); + } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java index 1c9fb93b8f111..7bd3849a0cee7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java @@ -11,13 +11,16 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import java.io.IOException; +import java.util.Iterator; import java.util.Objects; public class CcrStatsAction extends ActionType { @@ -48,7 +51,7 @@ public void writeTo(StreamOutput out) throws IOException { } } - public static class Response extends ActionResponse implements ToXContentObject { + public static class Response extends ActionResponse implements ChunkedToXContent { private final AutoFollowStats autoFollowStats; private final FollowStatsAction.StatsResponses followStats; @@ -79,14 +82,14 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - { - builder.field("auto_follow_stats", autoFollowStats, params); - builder.field("follow_stats", followStats, params); - } - builder.endObject(); - return builder; + public Iterator toXContentChunked(ToXContent.Params outerParams) { + return Iterators.concat( + Iterators.single( + (builder, params) -> builder.startObject().field("auto_follow_stats", autoFollowStats, params).field("follow_stats") + ), + followStats.toXContentChunked(outerParams), + ChunkedToXContentHelper.endObject() + ); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java index 5c37d2868468a..60515361e2807 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java @@ -11,15 +11,19 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.MasterNodeReadRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -77,7 +81,7 @@ public int hashCode() { } } - public static class Response extends ActionResponse implements ToXContentObject { + public static class Response extends ActionResponse implements ChunkedToXContent { public static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices"); @@ -102,15 +106,12 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startArray(FOLLOWER_INDICES_FIELD.getPreferredName()); - for (FollowerInfo followInfo : followInfos) { - followInfo.toXContent(builder, params); - } - builder.endArray(); - builder.endObject(); - return builder; + public Iterator toXContentChunked(ToXContent.Params outerParams) { + return Iterators.concat( + Iterators.single((builder, params) -> builder.startObject().startArray(FOLLOWER_INDICES_FIELD.getPreferredName())), + followInfos.iterator(), + Iterators.single((builder, params) -> builder.endArray().endObject()) + ); } @Override diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java index 1b0a67ee1a782..5ccf069323be4 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowStatsAction.java @@ -15,16 +15,21 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.tasks.BaseTasksRequest; import org.elasticsearch.action.support.tasks.BaseTasksResponse; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.Transports; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import java.io.IOException; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -40,7 +45,7 @@ private FollowStatsAction() { super(NAME, FollowStatsAction.StatsResponses::new); } - public static class StatsResponses extends BaseTasksResponse implements ToXContentObject { + public static class StatsResponses extends BaseTasksResponse implements ChunkedToXContent { private final List statsResponse; @@ -69,36 +74,32 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + public Iterator toXContentChunked(ToXContent.Params outerParams) { // sort by index name, then shard ID + Transports.assertNotTransportThread("collecting responses into a map may be expensive"); final Map> taskResponsesByIndex = new TreeMap<>(); for (final StatsResponse response : statsResponse) { taskResponsesByIndex.computeIfAbsent(response.status().followerIndex(), k -> new TreeMap<>()) .put(response.status().getShardId(), response); } - builder.startObject(); - { - builder.startArray("indices"); - { - for (final Map.Entry> index : taskResponsesByIndex.entrySet()) { - builder.startObject(); - { - builder.field("index", index.getKey()); - builder.startArray("shards"); - { - for (final Map.Entry shard : index.getValue().entrySet()) { - shard.getValue().status().toXContent(builder, params); - } - } - builder.endArray(); - } - builder.endObject(); - } - } - builder.endArray(); - } - builder.endObject(); - return builder; + return innerToXContentChunked(taskResponsesByIndex); + } + + private static Iterator innerToXContentChunked(Map> taskResponsesByIndex) { + return Iterators.concat( + Iterators.single((builder, params) -> builder.startObject().startArray("indices")), + Iterators.flatMap( + taskResponsesByIndex.entrySet().iterator(), + indexEntry -> Iterators.concat( + Iterators.single( + (builder, params) -> builder.startObject().field("index", indexEntry.getKey()).startArray("shards") + ), + indexEntry.getValue().values().iterator(), + Iterators.single((builder, params) -> builder.endArray().endObject()) + ) + ), + Iterators.single((builder, params) -> builder.endArray().endObject()) + ); } @Override @@ -178,7 +179,7 @@ public int hashCode() { } } - public static class StatsResponse implements Writeable { + public static class StatsResponse implements Writeable, ToXContentObject { private final ShardFollowNodeTaskStatus status; @@ -211,6 +212,11 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(status); } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + return status.toXContent(builder, params); + } } }