From e421115ca3cc03adb9faa71539125c2850922a19 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 5 Apr 2023 17:23:52 +0100 Subject: [PATCH 1/3] Deeper chunking of node stats response Pushes the chunking of `GET _nodes/stats` down to avoid creating unboundedly large chunks. With this commit we yield one chunk per shard (if `?level=shards`) or index (if `?level=indices`) and per HTTP client and per transport action. Closes #93985 --- .../elasticsearch/xcontent/ToXContent.java | 1 + .../admin/cluster/node/tasks/TasksIT.java | 2 +- .../admin/cluster/node/stats/NodeStats.java | 126 +++++++++--------- .../node/stats/NodesStatsResponse.java | 18 +-- .../nodes/BaseNodesXContentResponse.java | 5 +- .../replication/ReplicationOperation.java | 2 +- .../replication/ReplicationResponse.java | 6 +- .../common/xcontent/ChunkedToXContent.java | 6 + .../org/elasticsearch/http/HttpStats.java | 28 ++-- .../indices/NodeIndicesStats.java | 84 +++++++----- .../aggregations/AggregatorFactories.java | 2 +- .../transport/TransportStats.java | 66 +++++---- .../cluster/node/stats/NodeStatsTests.java | 44 ++++++ .../indices/NodeIndicesStatsTests.java | 2 +- .../node/NodeStatsMonitoringDoc.java | 3 +- .../exporter/FilteredMonitoringDoc.java | 5 +- 16 files changed, 236 insertions(+), 164 deletions(-) diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/ToXContent.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/ToXContent.java index 5c769435fb9ab..139758cd51e99 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/ToXContent.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/ToXContent.java @@ -122,4 +122,5 @@ default boolean isFragment() { return true; } + ToXContent EMPTY = (b, p) -> b; } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index b0a6b7eeada2a..a41a58e4fc46a 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -456,7 +456,7 @@ public void waitForTaskCompletion(Task task) {} // Need to run the task in a separate thread because node client's .execute() is blocked by our task listener index = new Thread(() -> { IndexResponse indexResponse = client().prepareIndex("test").setSource("test", "test").get(); - assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures()); + assertArrayEquals(ReplicationResponse.NO_FAILURES, indexResponse.getShardInfo().getFailures()); }); index.start(); assertTrue(taskRegistered.await(10, TimeUnit.SECONDS)); // waiting for at least one task to be registered diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 0e0a660b1a553..25fb5510d5f67 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -11,8 +11,10 @@ import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.Nullable; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; @@ -29,16 +31,17 @@ import org.elasticsearch.script.ScriptStats; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.transport.TransportStats; -import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Iterator; import java.util.Map; +import java.util.Objects; /** * Node statistics (dynamic, changes depending on when created). */ -public class NodeStats extends BaseNodeResponse implements ToXContentFragment { +public class NodeStats extends BaseNodeResponse implements ChunkedToXContent { private final long timestamp; @@ -275,72 +278,63 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + public Iterator toXContentChunked(ToXContent.Params outerParams) { - builder.field("name", getNode().getName()); - builder.field("transport_address", getNode().getAddress().toString()); - builder.field("host", getNode().getHostName()); - builder.field("ip", getNode().getAddress()); + return Iterators.concat(Iterators.single((builder, params) -> { + builder.field("name", getNode().getName()); + builder.field("transport_address", getNode().getAddress().toString()); + builder.field("host", getNode().getHostName()); + builder.field("ip", getNode().getAddress()); - builder.startArray("roles"); - for (DiscoveryNodeRole role : getNode().getRoles()) { - builder.value(role.roleName()); - } - builder.endArray(); - - if (getNode().getAttributes().isEmpty() == false) { - builder.startObject("attributes"); - for (Map.Entry attrEntry : getNode().getAttributes().entrySet()) { - builder.field(attrEntry.getKey(), attrEntry.getValue()); + builder.startArray("roles"); + for (DiscoveryNodeRole role : getNode().getRoles()) { + builder.value(role.roleName()); + } + builder.endArray(); + + if (getNode().getAttributes().isEmpty() == false) { + builder.startObject("attributes"); + for (Map.Entry attrEntry : getNode().getAttributes().entrySet()) { + builder.field(attrEntry.getKey(), attrEntry.getValue()); + } + builder.endObject(); } - builder.endObject(); - } - if (getIndices() != null) { - getIndices().toXContent(builder, params); - } - if (getOs() != null) { - getOs().toXContent(builder, params); - } - if (getProcess() != null) { - getProcess().toXContent(builder, params); - } - if (getJvm() != null) { - getJvm().toXContent(builder, params); - } - if (getThreadPool() != null) { - getThreadPool().toXContent(builder, params); - } - if (getFs() != null) { - getFs().toXContent(builder, params); - } - if (getTransport() != null) { - getTransport().toXContent(builder, params); - } - if (getHttp() != null) { - getHttp().toXContent(builder, params); - } - if (getBreaker() != null) { - getBreaker().toXContent(builder, params); - } - if (getScriptStats() != null) { - getScriptStats().toXContent(builder, params); - } - if (getDiscoveryStats() != null) { - getDiscoveryStats().toXContent(builder, params); - } - if (getIngestStats() != null) { - getIngestStats().toXContent(builder, params); - } - if (getAdaptiveSelectionStats() != null) { - getAdaptiveSelectionStats().toXContent(builder, params); - } - if (getScriptCacheStats() != null) { - getScriptCacheStats().toXContent(builder, params); - } - if (getIndexingPressureStats() != null) { - getIndexingPressureStats().toXContent(builder, params); - } - return builder; + return builder; + }), + + ifPresent(getIndices()).toXContentChunked(outerParams), + + Iterators.single((builder, params) -> { + ifPresent(getOs()).toXContent(builder, params); + ifPresent(getProcess()).toXContent(builder, params); + ifPresent(getJvm()).toXContent(builder, params); + ifPresent(getThreadPool()).toXContent(builder, params); + ifPresent(getFs()).toXContent(builder, params); + return builder; + }), + + ifPresent(getTransport()).toXContentChunked(outerParams), + ifPresent(getHttp()).toXContentChunked(outerParams), + + Iterators.single((builder, params) -> { + ifPresent(getBreaker()).toXContent(builder, params); + ifPresent(getScriptStats()).toXContent(builder, params); + ifPresent(getDiscoveryStats()).toXContent(builder, params); + ifPresent(getIngestStats()).toXContent(builder, params); + ifPresent(getAdaptiveSelectionStats()).toXContent(builder, params); + ifPresent(getScriptCacheStats()).toXContent(builder, params); + ifPresent(getIndexingPressureStats()).toXContent(builder, params); + return builder; + }) + ); + } + + private static ChunkedToXContent ifPresent(@Nullable ChunkedToXContent chunkedToXContent) { + return Objects.requireNonNullElse(chunkedToXContent, ChunkedToXContent.EMPTY); + } + + private static ToXContent ifPresent(@Nullable ToXContent toXContent) { + return Objects.requireNonNullElse(toXContent, ToXContent.EMPTY); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java index ac5bcd4a3da8b..29c62c638d929 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; @@ -42,16 +43,15 @@ protected void writeNodesTo(StreamOutput out, List nodes) throws IOEx } @Override - protected Iterator xContentChunks() { + protected Iterator xContentChunks(ToXContent.Params outerParams) { return Iterators.concat( - Iterators.single((b, p) -> b.startObject("nodes")), - getNodes().stream().map(nodeStats -> (ToXContent) (b, p) -> { - b.startObject(nodeStats.getNode().getId()); - b.field("timestamp", nodeStats.getTimestamp()); - nodeStats.toXContent(b, p); - return b.endObject(); - }).iterator(), - Iterators.single((b, p) -> b.endObject()) + ChunkedToXContentHelper.startObject("nodes"), + Iterators.flatMap(getNodes().iterator(), nodeStats -> Iterators.concat(Iterators.single((builder, params) -> { + builder.startObject(nodeStats.getNode().getId()); + builder.field("timestamp", nodeStats.getTimestamp()); + return builder; + }), nodeStats.toXContentChunked(outerParams), ChunkedToXContentHelper.endObject())), + ChunkedToXContentHelper.endObject() ); } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java index 4113788aafca8..69400bd62b9bd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.xcontent.ToXContent; @@ -38,8 +39,8 @@ public final Iterator toXContentChunked(ToXContent.Params b.startObject(); RestActions.buildNodesHeader(b, p, this); return b.field("cluster_name", getClusterName().value()); - }), xContentChunks(), Iterators.single((ToXContent) (b, p) -> b.endObject())); + }), xContentChunks(params), ChunkedToXContentHelper.endObject()); } - protected abstract Iterator xContentChunks(); + protected abstract Iterator xContentChunks(ToXContent.Params outerParams); } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 1566b1806e3e8..f660ee40f21e2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -465,7 +465,7 @@ private void finish() { if (finished.compareAndSet(false, true)) { final ReplicationResponse.ShardInfo.Failure[] failuresArray; if (shardReplicaFailures.isEmpty()) { - failuresArray = ReplicationResponse.EMPTY; + failuresArray = ReplicationResponse.NO_FAILURES; } else { failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()]; shardReplicaFailures.toArray(failuresArray); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java index 3644b35e187a4..209809a4294d9 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationResponse.java @@ -35,7 +35,7 @@ */ public class ReplicationResponse extends ActionResponse { - public static final ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0]; + public static final ReplicationResponse.ShardInfo.Failure[] NO_FAILURES = new ReplicationResponse.ShardInfo.Failure[0]; private ShardInfo shardInfo; @@ -68,7 +68,7 @@ public static class ShardInfo implements Writeable, ToXContentObject { private int total; private int successful; - private Failure[] failures = EMPTY; + private Failure[] failures = ReplicationResponse.NO_FAILURES; public ShardInfo() {} @@ -186,7 +186,7 @@ public static ShardInfo fromXContent(XContentParser parser) throws IOException { parser.skipChildren(); // skip potential inner arrays for forward compatibility } } - Failure[] failures = EMPTY; + Failure[] failures = ReplicationResponse.NO_FAILURES; if (failuresList != null) { failures = failuresList.toArray(new Failure[failuresList.size()]); } diff --git a/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java b/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java index a519b518e4c8f..44d521581dfab 100644 --- a/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java +++ b/server/src/main/java/org/elasticsearch/common/xcontent/ChunkedToXContent.java @@ -14,6 +14,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Collections; import java.util.Iterator; /** @@ -84,4 +85,9 @@ public boolean isFragment() { default boolean isFragment() { return true; } + + /** + * A {@link ChunkedToXContent} that yields no chunks + */ + ChunkedToXContent EMPTY = params -> Collections.emptyIterator(); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpStats.java b/server/src/main/java/org/elasticsearch/http/HttpStats.java index 28dcd0f8cecca..de86480fea9b4 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpStats.java +++ b/server/src/main/java/org/elasticsearch/http/HttpStats.java @@ -8,16 +8,20 @@ package org.elasticsearch.http; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; +import java.util.Iterator; import java.util.List; -public class HttpStats implements Writeable, ToXContentFragment { +public class HttpStats implements Writeable, ChunkedToXContent { private final long serverOpen; private final long totalOpen; @@ -78,17 +82,17 @@ static final class Fields { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.HTTP); - builder.field(Fields.CURRENT_OPEN, serverOpen); - builder.field(Fields.TOTAL_OPENED, totalOpen); - builder.startArray(Fields.CLIENTS); - for (ClientStats clientStats : this.clientStats) { - clientStats.toXContent(builder, params); - } - builder.endArray(); - builder.endObject(); - return builder; + public Iterator toXContentChunked(ToXContent.Params outerParams) { + return Iterators.concat( + Iterators.single( + (builder, params) -> builder.startObject(Fields.HTTP) + .field(Fields.CURRENT_OPEN, serverOpen) + .field(Fields.TOTAL_OPENED, totalOpen) + .startArray(Fields.CLIENTS) + ), + Iterators.flatMap(clientStats.iterator(), Iterators::single), + Iterators.single((builder, params) -> builder.endArray().endObject()) + ); } public static class ClientStats implements Writeable, ToXContentFragment { diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index 9e7f9d433ac09..517754f3e5578 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -13,9 +13,12 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.common.xcontent.ChunkedToXContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; import org.elasticsearch.index.bulk.stats.BulkStats; @@ -37,12 +40,13 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.search.suggest.completion.CompletionStats; -import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,7 +54,7 @@ /** * Global information on indices stats running on a specific node. */ -public class NodeIndicesStats implements Writeable, ToXContentFragment { +public class NodeIndicesStats implements Writeable, ChunkedToXContent { private static final TransportVersion VERSION_SUPPORTING_STATS_BY_INDEX = TransportVersion.V_8_5_0; @@ -216,40 +220,50 @@ public int hashCode() { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - final NodeStatsLevel level = NodeStatsLevel.of(params, NodeStatsLevel.NODE); + public Iterator toXContentChunked(ToXContent.Params outerParams) { - // "node" level - builder.startObject(Fields.INDICES); - stats.toXContent(builder, params); - - if (level == NodeStatsLevel.INDICES) { - Map indexStats = createCommonStatsByIndex(); + return Iterators.concat(Iterators.single((builder, params) -> { builder.startObject(Fields.INDICES); - for (Map.Entry entry : indexStats.entrySet()) { - builder.startObject(entry.getKey().getName()); - entry.getValue().toXContent(builder, params); - builder.endObject(); - } - builder.endObject(); - } else if (level == NodeStatsLevel.SHARDS) { - builder.startObject(Fields.SHARDS); - for (Map.Entry> entry : statsByShard.entrySet()) { - builder.startArray(entry.getKey().getName()); - for (IndexShardStats indexShardStats : entry.getValue()) { - builder.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId())); - for (ShardStats shardStats : indexShardStats.getShards()) { - shardStats.toXContent(builder, params); - } - builder.endObject().endObject(); - } - builder.endArray(); - } - builder.endObject(); - } - - builder.endObject(); - return builder; + return stats.toXContent(builder, params); + }), switch (NodeStatsLevel.of(outerParams, NodeStatsLevel.NODE)) { + + case NODE -> Collections.emptyIterator(); + + case INDICES -> Iterators.concat( + ChunkedToXContentHelper.startObject(Fields.INDICES), + Iterators.flatMap( + createCommonStatsByIndex().entrySet().iterator(), + entry -> Iterators.single((builder, params) -> { + builder.startObject(entry.getKey().getName()); + entry.getValue().toXContent(builder, params); + return builder.endObject(); + }) + ), + ChunkedToXContentHelper.endObject() + ); + + case SHARDS -> Iterators.concat( + ChunkedToXContentHelper.startObject(Fields.SHARDS), + Iterators.flatMap( + statsByShard.entrySet().iterator(), + entry -> Iterators.concat( + ChunkedToXContentHelper.startArray(entry.getKey().getName()), + Iterators.flatMap( + entry.getValue().iterator(), + indexShardStats -> Iterators.concat( + Iterators.single( + (b, p) -> b.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId())) + ), + Iterators.flatMap(Iterators.forArray(indexShardStats.getShards()), Iterators::single), + Iterators.single((b, p) -> b.endObject().endObject()) + ) + ), + ChunkedToXContentHelper.endArray() + ) + ), + ChunkedToXContentHelper.endObject() + ); + }, ChunkedToXContentHelper.endObject()); } private Map createCommonStatsByIndex() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java index b92a2b2307609..417eec5a237fe 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorFactories.java @@ -389,7 +389,7 @@ private ActionRequestValidationException validateChildren(ActionRequestValidatio public AggregatorFactories build(AggregationContext context, AggregatorFactory parent) throws IOException { if (aggregationBuilders.isEmpty() && pipelineAggregatorBuilders.isEmpty()) { - return EMPTY; + return AggregatorFactories.EMPTY; } AggregatorFactory[] aggFactories = new AggregatorFactory[aggregationBuilders.size()]; int i = 0; diff --git a/server/src/main/java/org/elasticsearch/transport/TransportStats.java b/server/src/main/java/org/elasticsearch/transport/TransportStats.java index 2cfd9f1663d35..b27183be66924 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportStats.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportStats.java @@ -10,21 +10,24 @@ import org.elasticsearch.TransportVersion; import org.elasticsearch.Version; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.Map; -public class TransportStats implements Writeable, ToXContentFragment { +public class TransportStats implements Writeable, ChunkedToXContent { private final long serverOpen; private final long totalOutboundConnections; @@ -182,34 +185,39 @@ private boolean assertHistogramsConsistent() { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.TRANSPORT); - builder.field(Fields.SERVER_OPEN, serverOpen); - builder.field(Fields.TOTAL_OUTBOUND_CONNECTIONS, totalOutboundConnections); - builder.field(Fields.RX_COUNT, rxCount); - builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize)); - builder.field(Fields.TX_COUNT, txCount); - builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize)); - if (inboundHandlingTimeBucketFrequencies.length > 0) { - histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM); - histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM); - } else { - // Stats came from before v8.1 - assert Version.CURRENT.major == Version.V_7_0_0.major + 1; - } - if (transportActionStats.isEmpty() == false) { - builder.startObject(Fields.ACTIONS); - for (final var entry : transportActionStats.entrySet()) { - builder.field(entry.getKey()); - entry.getValue().toXContent(builder, params); + public Iterator toXContentChunked(ToXContent.Params outerParams) { + return Iterators.concat(Iterators.single((builder, params) -> { + builder.startObject(Fields.TRANSPORT); + builder.field(Fields.SERVER_OPEN, serverOpen); + builder.field(Fields.TOTAL_OUTBOUND_CONNECTIONS, totalOutboundConnections); + builder.field(Fields.RX_COUNT, rxCount); + builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize)); + builder.field(Fields.TX_COUNT, txCount); + builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize)); + if (inboundHandlingTimeBucketFrequencies.length > 0) { + histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM); + histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM); + } else { + // Stats came from before v8.1 + assert Version.CURRENT.major == Version.V_7_0_0.major + 1; } - builder.endObject(); - } else { - // Stats came from before v8.8 - assert Version.CURRENT.major == Version.V_7_0_0.major + 1; - } - builder.endObject(); - return builder; + if (transportActionStats.isEmpty() == false) { + builder.startObject(Fields.ACTIONS); + } else { + // Stats came from before v8.8 + assert Version.CURRENT.major == Version.V_7_0_0.major + 1; + } + return builder; + }), Iterators.flatMap(transportActionStats.entrySet().iterator(), entry -> Iterators.single((builder, params) -> { + builder.field(entry.getKey()); + entry.getValue().toXContent(builder, params); + return builder; + })), Iterators.single((builder, params) -> { + if (transportActionStats.isEmpty() == false) { + builder.endObject(); + } + return builder.endObject(); + })); } static void histogramToXContent(XContentBuilder builder, long[] bucketFrequencies, String fieldName) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index c767aaf717319..122e3161f5e6f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.action.NodeStatsLevel; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; @@ -26,6 +27,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.HandlingTimeTracker; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.DiscoveryStats; import org.elasticsearch.http.HttpStats; @@ -66,11 +68,13 @@ import org.elasticsearch.script.ScriptStats; import org.elasticsearch.script.TimeSeries; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.transport.TransportActionStats; import org.elasticsearch.transport.TransportStats; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; import java.nio.file.Path; @@ -551,6 +555,46 @@ public void testSerialization() throws IOException { } } + public void testChunking() { + AbstractChunkedSerializingTestCase.assertChunkCount( + createNodeStats(), + randomFrom(ToXContent.EMPTY_PARAMS, new ToXContent.MapParams(Map.of("level", "node"))), + nodeStats -> expectedChunks(nodeStats, NodeStatsLevel.NODE) + ); + AbstractChunkedSerializingTestCase.assertChunkCount( + createNodeStats(), + new ToXContent.MapParams(Map.of("level", "indices")), + nodeStats -> expectedChunks(nodeStats, NodeStatsLevel.INDICES) + ); + AbstractChunkedSerializingTestCase.assertChunkCount( + createNodeStats(), + new ToXContent.MapParams(Map.of("level", "shards")), + nodeStats -> expectedChunks(nodeStats, NodeStatsLevel.SHARDS) + ); + } + + private int expectedChunks(NodeStats nodeStats, NodeStatsLevel level) { + return 3 + expectedChunks(nodeStats.getHttp()) + expectedChunks(nodeStats.getIndices(), level) + expectedChunks( + nodeStats.getTransport() + ); + } + + private static int expectedChunks(@Nullable HttpStats httpStats) { + return httpStats == null ? 0 : 2 + httpStats.getClientStats().size(); + } + + private static int expectedChunks(@Nullable TransportStats transportStats) { + return transportStats == null ? 0 : 3; // only one transport action + } + + private static int expectedChunks(@Nullable NodeIndicesStats nodeIndicesStats, NodeStatsLevel level) { + return nodeIndicesStats == null ? 0 : switch (level) { + case NODE -> 2; + case INDICES -> 5; // only one index + case SHARDS -> 9; // only one shard + }; + } + private static CommonStats createIndexLevelCommonStats() { CommonStats stats = new CommonStats(new CommonStatsFlags().clear().set(CommonStatsFlags.Flag.Mappings, true)); stats.nodeMappings = new NodeMappingStats(randomNonNegativeLong(), randomNonNegativeLong()); diff --git a/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java index ef0d4ccca16f0..f936fe5f07a68 100644 --- a/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java @@ -22,7 +22,7 @@ public void testInvalidLevel() { final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), Collections.emptyMap()); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContent(null, params)); + final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContentChunked(params)); assertThat( e, hasToString(containsString("level parameter must be one of [node] or [indices] or [shards] but was [" + level + "]")) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java index 0f29ec76ff752..36c1d083eb6f8 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDoc.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.monitoring.collector.node; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; @@ -69,7 +70,7 @@ protected void innerToXContent(XContentBuilder builder, Params params) throws IO builder.field("node_id", nodeId); builder.field("node_master", nodeMaster); builder.field("mlockall", mlockall); - nodeStats.toXContent(builder, params); + ChunkedToXContent.wrapAsToXContent(nodeStats).toXContent(builder, params); } builder.endObject(); } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java index f27afbdc97ed9..9bf6c4153d5a8 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/FilteredMonitoringDoc.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.Nullable; +import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; @@ -20,8 +21,6 @@ import java.io.InputStream; import java.util.Set; -import static org.elasticsearch.xcontent.NamedXContentRegistry.EMPTY; - /** * {@link FilteredMonitoringDoc} are a kind of {@link MonitoringDoc} whose XContent * is filtered when the document is printed out. @@ -66,7 +65,7 @@ public final XContentBuilder toXContent(XContentBuilder builder, Params params) } try ( InputStream stream = out.bytes().streamInput(); - XContentParser parser = xContent.createParser(EMPTY, LoggingDeprecationHandler.INSTANCE, stream) + XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream) ) { return builder.copyCurrentStructure(parser); } From cffd4adad5ae1ca0e8f83d8ac6c6d253ab595880 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 5 Apr 2023 19:50:16 +0100 Subject: [PATCH 2/3] =?UTF-8?q?Mocks=20=F0=9F=91=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collector/node/NodeStatsMonitoringDocTests.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index f270d23161c5b..898bd68f4eab6 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -47,7 +48,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCase { @@ -63,6 +66,7 @@ public void setUp() throws Exception { nodeId = randomAlphaOfLength(5); isMaster = randomBoolean(); nodeStats = mock(NodeStats.class); + when(nodeStats.toXContentChunked(any())).thenReturn(Collections.emptyIterator()); mlockall = randomBoolean(); } From 2e6aee4edc05c8675fc5bc48acfd5df98c22f167 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 5 Apr 2023 20:32:35 +0100 Subject: [PATCH 3/3] Nicer whitespace --- .../admin/cluster/node/stats/NodeStats.java | 42 +++++---- .../indices/NodeIndicesStats.java | 91 ++++++++++--------- .../transport/TransportStats.java | 71 ++++++++------- 3 files changed, 110 insertions(+), 94 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 25fb5510d5f67..13352784a57c4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -280,28 +280,30 @@ public void writeTo(StreamOutput out) throws IOException { @Override public Iterator toXContentChunked(ToXContent.Params outerParams) { - return Iterators.concat(Iterators.single((builder, params) -> { - builder.field("name", getNode().getName()); - builder.field("transport_address", getNode().getAddress().toString()); - builder.field("host", getNode().getHostName()); - builder.field("ip", getNode().getAddress()); - - builder.startArray("roles"); - for (DiscoveryNodeRole role : getNode().getRoles()) { - builder.value(role.roleName()); - } - builder.endArray(); - - if (getNode().getAttributes().isEmpty() == false) { - builder.startObject("attributes"); - for (Map.Entry attrEntry : getNode().getAttributes().entrySet()) { - builder.field(attrEntry.getKey(), attrEntry.getValue()); + return Iterators.concat( + + Iterators.single((builder, params) -> { + builder.field("name", getNode().getName()); + builder.field("transport_address", getNode().getAddress().toString()); + builder.field("host", getNode().getHostName()); + builder.field("ip", getNode().getAddress()); + + builder.startArray("roles"); + for (DiscoveryNodeRole role : getNode().getRoles()) { + builder.value(role.roleName()); + } + builder.endArray(); + + if (getNode().getAttributes().isEmpty() == false) { + builder.startObject("attributes"); + for (Map.Entry attrEntry : getNode().getAttributes().entrySet()) { + builder.field(attrEntry.getKey(), attrEntry.getValue()); + } + builder.endObject(); } - builder.endObject(); - } - return builder; - }), + return builder; + }), ifPresent(getIndices()).toXContentChunked(outerParams), diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index 517754f3e5578..2d6ad97317446 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -222,48 +222,55 @@ public int hashCode() { @Override public Iterator toXContentChunked(ToXContent.Params outerParams) { - return Iterators.concat(Iterators.single((builder, params) -> { - builder.startObject(Fields.INDICES); - return stats.toXContent(builder, params); - }), switch (NodeStatsLevel.of(outerParams, NodeStatsLevel.NODE)) { - - case NODE -> Collections.emptyIterator(); - - case INDICES -> Iterators.concat( - ChunkedToXContentHelper.startObject(Fields.INDICES), - Iterators.flatMap( - createCommonStatsByIndex().entrySet().iterator(), - entry -> Iterators.single((builder, params) -> { - builder.startObject(entry.getKey().getName()); - entry.getValue().toXContent(builder, params); - return builder.endObject(); - }) - ), - ChunkedToXContentHelper.endObject() - ); - - case SHARDS -> Iterators.concat( - ChunkedToXContentHelper.startObject(Fields.SHARDS), - Iterators.flatMap( - statsByShard.entrySet().iterator(), - entry -> Iterators.concat( - ChunkedToXContentHelper.startArray(entry.getKey().getName()), - Iterators.flatMap( - entry.getValue().iterator(), - indexShardStats -> Iterators.concat( - Iterators.single( - (b, p) -> b.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId())) - ), - Iterators.flatMap(Iterators.forArray(indexShardStats.getShards()), Iterators::single), - Iterators.single((b, p) -> b.endObject().endObject()) - ) - ), - ChunkedToXContentHelper.endArray() - ) - ), - ChunkedToXContentHelper.endObject() - ); - }, ChunkedToXContentHelper.endObject()); + return Iterators.concat( + + Iterators.single((builder, params) -> { + builder.startObject(Fields.INDICES); + return stats.toXContent(builder, params); + }), + + switch (NodeStatsLevel.of(outerParams, NodeStatsLevel.NODE)) { + + case NODE -> Collections.emptyIterator(); + + case INDICES -> Iterators.concat( + ChunkedToXContentHelper.startObject(Fields.INDICES), + Iterators.flatMap( + createCommonStatsByIndex().entrySet().iterator(), + entry -> Iterators.single((builder, params) -> { + builder.startObject(entry.getKey().getName()); + entry.getValue().toXContent(builder, params); + return builder.endObject(); + }) + ), + ChunkedToXContentHelper.endObject() + ); + + case SHARDS -> Iterators.concat( + ChunkedToXContentHelper.startObject(Fields.SHARDS), + Iterators.flatMap( + statsByShard.entrySet().iterator(), + entry -> Iterators.concat( + ChunkedToXContentHelper.startArray(entry.getKey().getName()), + Iterators.flatMap( + entry.getValue().iterator(), + indexShardStats -> Iterators.concat( + Iterators.single( + (b, p) -> b.startObject().startObject(String.valueOf(indexShardStats.getShardId().getId())) + ), + Iterators.flatMap(Iterators.forArray(indexShardStats.getShards()), Iterators::single), + Iterators.single((b, p) -> b.endObject().endObject()) + ) + ), + ChunkedToXContentHelper.endArray() + ) + ), + ChunkedToXContentHelper.endObject() + ); + }, + + ChunkedToXContentHelper.endObject() + ); } private Map createCommonStatsByIndex() { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportStats.java b/server/src/main/java/org/elasticsearch/transport/TransportStats.java index b27183be66924..561b5b5faf005 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportStats.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportStats.java @@ -186,38 +186,45 @@ private boolean assertHistogramsConsistent() { @Override public Iterator toXContentChunked(ToXContent.Params outerParams) { - return Iterators.concat(Iterators.single((builder, params) -> { - builder.startObject(Fields.TRANSPORT); - builder.field(Fields.SERVER_OPEN, serverOpen); - builder.field(Fields.TOTAL_OUTBOUND_CONNECTIONS, totalOutboundConnections); - builder.field(Fields.RX_COUNT, rxCount); - builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize)); - builder.field(Fields.TX_COUNT, txCount); - builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize)); - if (inboundHandlingTimeBucketFrequencies.length > 0) { - histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM); - histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM); - } else { - // Stats came from before v8.1 - assert Version.CURRENT.major == Version.V_7_0_0.major + 1; - } - if (transportActionStats.isEmpty() == false) { - builder.startObject(Fields.ACTIONS); - } else { - // Stats came from before v8.8 - assert Version.CURRENT.major == Version.V_7_0_0.major + 1; - } - return builder; - }), Iterators.flatMap(transportActionStats.entrySet().iterator(), entry -> Iterators.single((builder, params) -> { - builder.field(entry.getKey()); - entry.getValue().toXContent(builder, params); - return builder; - })), Iterators.single((builder, params) -> { - if (transportActionStats.isEmpty() == false) { - builder.endObject(); - } - return builder.endObject(); - })); + return Iterators.concat( + + Iterators.single((builder, params) -> { + builder.startObject(Fields.TRANSPORT); + builder.field(Fields.SERVER_OPEN, serverOpen); + builder.field(Fields.TOTAL_OUTBOUND_CONNECTIONS, totalOutboundConnections); + builder.field(Fields.RX_COUNT, rxCount); + builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, ByteSizeValue.ofBytes(rxSize)); + builder.field(Fields.TX_COUNT, txCount); + builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, ByteSizeValue.ofBytes(txSize)); + if (inboundHandlingTimeBucketFrequencies.length > 0) { + histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM); + histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM); + } else { + // Stats came from before v8.1 + assert Version.CURRENT.major == Version.V_7_0_0.major + 1; + } + if (transportActionStats.isEmpty() == false) { + builder.startObject(Fields.ACTIONS); + } else { + // Stats came from before v8.8 + assert Version.CURRENT.major == Version.V_7_0_0.major + 1; + } + return builder; + }), + + Iterators.flatMap(transportActionStats.entrySet().iterator(), entry -> Iterators.single((builder, params) -> { + builder.field(entry.getKey()); + entry.getValue().toXContent(builder, params); + return builder; + })), + + Iterators.single((builder, params) -> { + if (transportActionStats.isEmpty() == false) { + builder.endObject(); + } + return builder.endObject(); + }) + ); } static void histogramToXContent(XContentBuilder builder, long[] bucketFrequencies, String fieldName) throws IOException {