Skip to content

Commit

Permalink
More node stats chunking
Browse files Browse the repository at this point in the history
Ingest stats are O(pipelines*processors) too, and that could be quite a
large amount of data in some cases.

Relates elastic#95060
  • Loading branch information
DaveCTurner committed Apr 6, 2023
1 parent c282f50 commit fbeb543
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,12 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
ifPresent(getBreaker()).toXContent(builder, params);
ifPresent(getScriptStats()).toXContent(builder, params);
ifPresent(getDiscoveryStats()).toXContent(builder, params);
ifPresent(getIngestStats()).toXContent(builder, params);
return builder;
}),

ifPresent(getIngestStats()).toXContentChunked(outerParams),

Iterators.single((builder, params) -> {
ifPresent(getAdaptiveSelectionStats()).toXContent(builder, params);
ifPresent(getScriptCacheStats()).toXContent(builder, params);
ifPresent(getIndexingPressureStats()).toXContent(builder, params);
Expand Down
79 changes: 49 additions & 30 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,28 @@

package org.elasticsearch.ingest;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

public class IngestStats implements Writeable, ToXContentFragment {
public class IngestStats implements Writeable, ChunkedToXContent {
private final Stats totalStats;
private final List<PipelineStat> pipelineStats;
private final Map<String, List<ProcessorStat>> processorStats;
Expand Down Expand Up @@ -97,35 +101,50 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("ingest");
builder.startObject("total");
totalStats.toXContent(builder, params);
builder.endObject();
builder.startObject("pipelines");
for (PipelineStat pipelineStat : pipelineStats) {
builder.startObject(pipelineStat.getPipelineId());
pipelineStat.getStats().toXContent(builder, params);
List<ProcessorStat> processorStatsForPipeline = processorStats.get(pipelineStat.getPipelineId());
builder.startArray("processors");
if (processorStatsForPipeline != null) {
for (ProcessorStat processorStat : processorStatsForPipeline) {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
}
}
builder.endArray();
builder.endObject();
}
builder.endObject();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(

Iterators.single((builder, params) -> {
builder.startObject("ingest");
builder.startObject("total");
totalStats.toXContent(builder, params);
builder.endObject();
builder.startObject("pipelines");
return builder;
}),

Iterators.flatMap(
pipelineStats.iterator(),
pipelineStat -> Iterators.concat(

Iterators.single((builder, params) -> {
builder.startObject(pipelineStat.getPipelineId());
pipelineStat.getStats().toXContent(builder, params);
builder.startArray("processors");
return builder;
}),

Iterators.flatMap(
processorStats.getOrDefault(pipelineStat.getPipelineId(), List.of()).iterator(),
processorStat -> Iterators.<ToXContent>single((builder, params) -> {
builder.startObject();
builder.startObject(processorStat.getName());
builder.field("type", processorStat.getType());
builder.startObject("stats");
processorStat.getStats().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.endObject();
return builder;
})
),

Iterators.<ToXContent>single((builder, params) -> builder.endArray().endObject())
)
),

Iterators.<ToXContent>single((builder, params) -> builder.endObject().endObject())
);
}

public Stats getTotalStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,10 +573,21 @@ public void testChunking() {
);
}

private int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) {
return 3 + expectedChunks(nodeStats.getHttp()) + expectedChunks(nodeStats.getIndices(), level) + expectedChunks(
private static int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) {
return 4 + expectedChunks(nodeStats.getHttp()) + expectedChunks(nodeStats.getIndices(), level) + expectedChunks(
nodeStats.getTransport()
);
) + expectedChunks(nodeStats.getIngestStats());
}

private static int expectedChunks(@Nullable IngestStats ingestStats) {
return ingestStats == null
? 0
: 2 + ingestStats.getPipelineStats()
.stream()
.mapToInt(
pipelineStats -> 2 + ingestStats.getProcessorStats().getOrDefault(pipelineStats.getPipelineId(), List.of()).size()
)
.sum();
}

private static int expectedChunks(@Nullable HttpStats httpStats) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -168,7 +169,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(PIPELINE_COUNT.getPreferredName(), pipelineCount);
if (pipelineCount > 0) {
// Ingest stats is a fragment
ingestStats.toXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(ingestStats).toXContent(builder, params);
}
if (this.inferenceStats != null) {
builder.field(INFERENCE_STATS.getPreferredName(), this.inferenceStats);
Expand Down

0 comments on commit fbeb543

Please sign in to comment.