diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 13352784a57c4..3c3c097b605d5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -323,7 +323,12 @@ public Iterator toXContentChunked(ToXContent.Params outerP ifPresent(getBreaker()).toXContent(builder, params); ifPresent(getScriptStats()).toXContent(builder, params); ifPresent(getDiscoveryStats()).toXContent(builder, params); - ifPresent(getIngestStats()).toXContent(builder, params); + return builder; + }), + + ifPresent(getIngestStats()).toXContentChunked(outerParams), + + Iterators.single((builder, params) -> { ifPresent(getAdaptiveSelectionStats()).toXContent(builder, params); ifPresent(getScriptCacheStats()).toXContent(builder, params); ifPresent(getIndexingPressureStats()).toXContent(builder, params); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index 3b320d8c71259..1b4764da9f6c5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -8,11 +8,14 @@ package org.elasticsearch.ingest; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; @@ -20,12 +23,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.TimeUnit; -public class IngestStats implements Writeable, ToXContentFragment { +public class IngestStats implements Writeable, ChunkedToXContent { private final Stats totalStats; private final List pipelineStats; private final Map> processorStats; @@ -97,35 +101,50 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("ingest"); - builder.startObject("total"); - totalStats.toXContent(builder, params); - builder.endObject(); - builder.startObject("pipelines"); - for (PipelineStat pipelineStat : pipelineStats) { - builder.startObject(pipelineStat.getPipelineId()); - pipelineStat.getStats().toXContent(builder, params); - List processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId()); - builder.startArray("processors"); - if (processorStatsForPipeline != null) { - for (ProcessorStat processorStat : processorStatsForPipeline) { - builder.startObject(); - builder.startObject(processorStat.getName()); - builder.field("type", processorStat.getType()); - builder.startObject("stats"); - processorStat.getStats().toXContent(builder, params); - builder.endObject(); - builder.endObject(); - builder.endObject(); - } - } - builder.endArray(); - builder.endObject(); - } - builder.endObject(); - builder.endObject(); - return builder; + public Iterator toXContentChunked(ToXContent.Params outerParams) { + return Iterators.concat( + + Iterators.single((builder, params) -> { + builder.startObject("ingest"); + builder.startObject("total"); + totalStats.toXContent(builder, params); + builder.endObject(); + builder.startObject("pipelines"); + return builder; + }), + + Iterators.flatMap( + pipelineStats.iterator(), + pipelineStat -> Iterators.concat( + + Iterators.single((builder, params) -> { + builder.startObject(pipelineStat.getPipelineId()); + pipelineStat.getStats().toXContent(builder, params); + builder.startArray("processors"); + return builder; + }), + + Iterators.flatMap( + processorStats.getOrDefault(pipelineStat.getPipelineId(), List.of()).iterator(), + processorStat -> Iterators.single((builder, params) -> { + builder.startObject(); + builder.startObject(processorStat.getName()); + builder.field("type", processorStat.getType()); + builder.startObject("stats"); + processorStat.getStats().toXContent(builder, params); + builder.endObject(); + builder.endObject(); + builder.endObject(); + return builder; + }) + ), + + Iterators.single((builder, params) -> builder.endArray().endObject()) + ) + ), + + Iterators.single((builder, params) -> builder.endObject().endObject()) + ); } public Stats getTotalStats() { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 122e3161f5e6f..ca710c0ae13c9 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -573,10 +573,21 @@ public void testChunking() { ); } - private int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) { - return 3 + expectedChunks(nodeStats.getHttp()) + expectedChunks(nodeStats.getIndices(), level) + expectedChunks( + private static int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) { + return 4 + expectedChunks(nodeStats.getHttp()) + expectedChunks(nodeStats.getIndices(), level) + expectedChunks( nodeStats.getTransport() - ); + ) + expectedChunks(nodeStats.getIngestStats()); + } + + private static int expectedChunks(@Nullable IngestStats ingestStats) { + return ingestStats == null + ? 0 + : 2 + ingestStats.getPipelineStats() + .stream() + .mapToInt( + pipelineStats -> 2 + ingestStats.getProcessorStats().getOrDefault(pipelineStats.getPipelineId(), List.of()).size() + ) + .sum(); } private static int expectedChunks(@Nullable HttpStats httpStats) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsAction.java index bb26fdb841c30..869e1059f63ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.xcontent.ParseField; @@ -168,7 +169,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(PIPELINE_COUNT.getPreferredName(), pipelineCount); if (pipelineCount > 0) { // Ingest stats is a fragment - ingestStats.toXContent(builder, params); + ChunkedToXContent.wrapAsToXContent(ingestStats).toXContent(builder, params); } if (this.inferenceStats != null) { builder.field(INFERENCE_STATS.getPreferredName(), this.inferenceStats);