Skip to content

Commit

Permalink
Chunked encoding for cluster state API
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Dec 12, 2022
1 parent 296eacd commit 5718287
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -75,7 +76,8 @@ protected void addCustomFields(XContentBuilder builder, Params params) throws IO
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
builder.startObject("state");
state.toXContent(builder, params);
// TODO this should be chunked, see #89838
ChunkedToXContent.wrapAsXContentObject(state).toXContent(builder, params);
builder.endObject();
}

Expand Down
231 changes: 122 additions & 109 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
Expand All @@ -35,6 +32,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -44,13 +42,13 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -59,6 +57,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -95,7 +94,7 @@
* <p>
* Cluster state updates can be used to trigger various actions via a {@link ClusterStateListener} rather than using a timer.
* <p>
* Implements {@link ToXContentFragment} to be exposed in REST APIs (e.g. {@code GET _cluster/state} and {@code POST _cluster/reroute}) and
* Implements {@link ChunkedToXContent} to be exposed in REST APIs (e.g. {@code GET _cluster/state} and {@code POST _cluster/reroute}) and
* to be indexed by monitoring, mostly just for diagnostics purposes. The {@link XContent} representation does not need to be 100% faithful
* since we never reconstruct a cluster state from its XContent representation, but the more faithful it is the more useful it is for
* diagnostics. Note that the {@link XContent} representation of the {@link Metadata} portion does have to be faithful (in {@link
Expand All @@ -104,7 +103,7 @@
* Security-sensitive data such as passwords or private keys should not be stored in the cluster state, since the contents of the cluster
* state are exposed in various APIs.
*/
public class ClusterState implements ToXContentFragment, Diffable<ClusterState> {
public class ClusterState implements ChunkedToXContent, Diffable<ClusterState> {

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

Expand All @@ -124,7 +123,7 @@ default boolean isPrivate() {
* the more faithful it is the more useful it is for diagnostics.
*/
@Override
Iterator<? extends ToXContent> toXContentChunked(Params params);
Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params);
}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -520,114 +519,128 @@ public String toString() {
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumSet<Metric> metrics = Metric.parseString(params.param("metric", "_all"), true);

// always provide the cluster_uuid as part of the top-level response (also part of the metadata response)
builder.field("cluster_uuid", metadata().clusterUUID());

if (metrics.contains(Metric.VERSION)) {
builder.field("version", version);
builder.field("state_uuid", stateUUID);
}

if (metrics.contains(Metric.MASTER_NODE)) {
builder.field("master_node", nodes().getMasterNodeId());
}

if (metrics.contains(Metric.BLOCKS)) {
builder.startObject("blocks");

if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
}
builder.endObject();
}
private static <T> Iterator<ToXContent> chunkedSection(
boolean condition,
ToXContent before,
Iterator<T> items,
Function<T, Iterator<ToXContent>> fn,
ToXContent after
) {
return condition
? Iterators.concat(Iterators.single(before), Iterators.flatMap(items, fn::apply), Iterators.single(after))
: Collections.emptyIterator();
}

if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
for (Map.Entry<String, Set<ClusterBlock>> entry : blocks().indices().entrySet()) {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
final var metrics = Metric.parseString(outerParams.param("metric", "_all"), true);

return Iterators.concat(
// always provide the cluster_uuid as part of the top-level response (also part of the metadata response)
ChunkedToXContentHelper.field("cluster_uuid", metadata().clusterUUID()),

// state version info
metrics.contains(Metric.VERSION)
? Iterators.single((builder2, params2) -> builder2.field("version", version).field("state_uuid", stateUUID))
: Collections.emptyIterator(),

// master node
metrics.contains(Metric.MASTER_NODE)
? Iterators.single((builder1, params1) -> builder1.field("master_node", nodes().getMasterNodeId()))
: Collections.emptyIterator(),

// blocks
chunkedSection(metrics.contains(Metric.BLOCKS), (builder, params) -> {
builder.startObject("blocks");
if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
}

builder.endObject();
}

// nodes
if (metrics.contains(Metric.NODES)) {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
node.toXContent(builder, params);
}
builder.endObject();
}

// meta data
if (metrics.contains(Metric.METADATA)) {
ChunkedToXContent.wrapAsXContentObject(metadata).toXContent(builder, params);
}

// routing table
if (metrics.contains(Metric.ROUTING_TABLE)) {
builder.startObject("routing_table");
builder.startObject("indices");
for (IndexRoutingTable indexRoutingTable : routingTable()) {
builder.startObject(indexRoutingTable.getIndex().getName());
builder.startObject("shards");
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
builder.startArray(Integer.toString(indexShardRoutingTable.shardId().id()));
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
indexShardRoutingTable.shard(copy).toXContent(builder, params);
}
builder.endArray();
if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
}
builder.endObject();
builder.endObject();
}
builder.endObject();
builder.endObject();
}

// routing nodes
if (metrics.contains(Metric.ROUTING_NODES)) {
builder.startObject("routing_nodes");
builder.startArray("unassigned");
for (ShardRouting shardRouting : getRoutingNodes().unassigned()) {
shardRouting.toXContent(builder, params);
}
builder.endArray();

builder.startObject("nodes");
for (RoutingNode routingNode : getRoutingNodes()) {
builder.startArray(routingNode.nodeId() == null ? "null" : routingNode.nodeId());
for (ShardRouting shardRouting : routingNode) {
shardRouting.toXContent(builder, params);
return builder;
}, blocks.indices().entrySet().iterator(), entry -> Iterators.single((builder, params) -> {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
block.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();

builder.endObject();
}
if (metrics.contains(Metric.CUSTOMS)) {
for (Map.Entry<String, Custom> cursor : customs.entrySet()) {
builder.startObject(cursor.getKey());
ChunkedToXContent.wrapAsXContentObject(cursor.getValue()).toXContent(builder, params);
builder.endObject();
}
}

return builder;
return builder.endObject();
}), (builder, params) -> {
if (blocks().indices().isEmpty() == false) {
builder.endObject();
}
return builder.endObject();
}),

// nodes
chunkedSection(
metrics.contains(Metric.NODES),
(builder, params) -> builder.startObject("nodes"),
nodes.iterator(),
Iterators::single,
(builder, params) -> builder.endObject()
),

// metadata
metrics.contains(Metric.METADATA) ? metadata.toXContentChunked(outerParams) : Collections.emptyIterator(),

// routing table
chunkedSection(
metrics.contains(Metric.ROUTING_TABLE),
(builder, params) -> builder.startObject("routing_table").startObject("indices"),
routingTable().iterator(),
indexRoutingTable -> Iterators.single((builder, params) -> {
builder.startObject(indexRoutingTable.getIndex().getName());
builder.startObject("shards");
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
builder.startArray(Integer.toString(indexShardRoutingTable.shardId().id()));
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
indexShardRoutingTable.shard(copy).toXContent(builder, params);
}
builder.endArray();
}
return builder.endObject().endObject();
}),
(builder, params) -> builder.endObject().endObject()
),

// routing nodes
chunkedSection(
metrics.contains(Metric.ROUTING_NODES),
(builder, params) -> builder.startObject("routing_nodes").startArray("unassigned"),
getRoutingNodes().unassigned().iterator(),
Iterators::single,
(builder, params) -> builder.endArray() // no endObject() here, continued in next chunkedSection()
),
chunkedSection(
metrics.contains(Metric.ROUTING_NODES),
(builder, params) -> builder.startObject("nodes"),
getRoutingNodes().iterator(),
routingNode -> Iterators.concat(
ChunkedToXContentHelper.startArray(routingNode.nodeId() == null ? "null" : routingNode.nodeId()),
routingNode.iterator(),
ChunkedToXContentHelper.endArray()
),
(builder, params) -> builder.endObject().endObject()
),

// customs
metrics.contains(Metric.CUSTOMS)
? Iterators.flatMap(
customs.entrySet().iterator(),
cursor -> Iterators.concat(
ChunkedToXContentHelper.startObject(cursor.getKey()),
cursor.getValue().toXContentChunked(outerParams),
ChunkedToXContentHelper.endObject()
)
)
: Collections.emptyIterator()
);
}

public static Builder builder(ClusterName clusterName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ public static Iterator<ToXContent> field(String name, boolean value) {
return Iterators.single(((builder, params) -> builder.field(name, value)));
}

public static Iterator<ToXContent> field(String name, String value) {
return Iterators.single(((builder, params) -> builder.field(name, value)));
}

public static Iterator<ToXContent> array(String name, Iterator<? extends ToXContent> contents) {
return Iterators.concat(ChunkedToXContentHelper.startArray(name), contents, ChunkedToXContentHelper.endArray());
}
Expand Down

This file was deleted.

Loading

0 comments on commit 5718287

Please sign in to comment.