Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deeper chunking of node stats response #95060

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,5 @@ default boolean isFragment() {
return true;
}

ToXContent EMPTY = (b, p) -> b;
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ public void waitForTaskCompletion(Task task) {}
// Need to run the task in a separate thread because node client's .execute() is blocked by our task listener
index = new Thread(() -> {
IndexResponse indexResponse = client().prepareIndex("test").setSource("test", "test").get();
assertArrayEquals(ReplicationResponse.EMPTY, indexResponse.getShardInfo().getFailures());
assertArrayEquals(ReplicationResponse.NO_FAILURES, indexResponse.getShardInfo().getFailures());
});
index.start();
assertTrue(taskRegistered.await(10, TimeUnit.SECONDS)); // waiting for at least one task to be registered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.http.HttpStats;
Expand All @@ -29,16 +31,17 @@
import org.elasticsearch.script.ScriptStats;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportStats;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

/**
* Node statistics (dynamic, changes depending on when created).
*/
public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
public class NodeStats extends BaseNodeResponse implements ChunkedToXContent {

private final long timestamp;

Expand Down Expand Up @@ -275,72 +278,63 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {

builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());
return Iterators.concat(Iterators.single((builder, params) -> {
builder.field("name", getNode().getName());
builder.field("transport_address", getNode().getAddress().toString());
builder.field("host", getNode().getHostName());
builder.field("ip", getNode().getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
builder.startArray("roles");
for (DiscoveryNodeRole role : getNode().getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (getNode().getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : getNode().getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
builder.endObject();
}

if (getIndices() != null) {
getIndices().toXContent(builder, params);
}
if (getOs() != null) {
getOs().toXContent(builder, params);
}
if (getProcess() != null) {
getProcess().toXContent(builder, params);
}
if (getJvm() != null) {
getJvm().toXContent(builder, params);
}
if (getThreadPool() != null) {
getThreadPool().toXContent(builder, params);
}
if (getFs() != null) {
getFs().toXContent(builder, params);
}
if (getTransport() != null) {
getTransport().toXContent(builder, params);
}
if (getHttp() != null) {
getHttp().toXContent(builder, params);
}
if (getBreaker() != null) {
getBreaker().toXContent(builder, params);
}
if (getScriptStats() != null) {
getScriptStats().toXContent(builder, params);
}
if (getDiscoveryStats() != null) {
getDiscoveryStats().toXContent(builder, params);
}
if (getIngestStats() != null) {
getIngestStats().toXContent(builder, params);
}
if (getAdaptiveSelectionStats() != null) {
getAdaptiveSelectionStats().toXContent(builder, params);
}
if (getScriptCacheStats() != null) {
getScriptCacheStats().toXContent(builder, params);
}
if (getIndexingPressureStats() != null) {
getIndexingPressureStats().toXContent(builder, params);
}
return builder;
return builder;
}),

ifPresent(getIndices()).toXContentChunked(outerParams),

Iterators.single((builder, params) -> {
ifPresent(getOs()).toXContent(builder, params);
ifPresent(getProcess()).toXContent(builder, params);
ifPresent(getJvm()).toXContent(builder, params);
ifPresent(getThreadPool()).toXContent(builder, params);
ifPresent(getFs()).toXContent(builder, params);
return builder;
}),

ifPresent(getTransport()).toXContentChunked(outerParams),
ifPresent(getHttp()).toXContentChunked(outerParams),

Iterators.single((builder, params) -> {
ifPresent(getBreaker()).toXContent(builder, params);
ifPresent(getScriptStats()).toXContent(builder, params);
ifPresent(getDiscoveryStats()).toXContent(builder, params);
ifPresent(getIngestStats()).toXContent(builder, params);
ifPresent(getAdaptiveSelectionStats()).toXContent(builder, params);
ifPresent(getScriptCacheStats()).toXContent(builder, params);
ifPresent(getIndexingPressureStats()).toXContent(builder, params);
return builder;
})
);
}

private static ChunkedToXContent ifPresent(@Nullable ChunkedToXContent chunkedToXContent) {
return Objects.requireNonNullElse(chunkedToXContent, ChunkedToXContent.EMPTY);
}

private static ToXContent ifPresent(@Nullable ToXContent toXContent) {
return Objects.requireNonNullElse(toXContent, ToXContent.EMPTY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
Expand Down Expand Up @@ -42,16 +43,15 @@ protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOEx
}

@Override
protected Iterator<? extends ToXContent> xContentChunks() {
protected Iterator<? extends ToXContent> xContentChunks(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((b, p) -> b.startObject("nodes")),
getNodes().stream().map(nodeStats -> (ToXContent) (b, p) -> {
b.startObject(nodeStats.getNode().getId());
b.field("timestamp", nodeStats.getTimestamp());
nodeStats.toXContent(b, p);
return b.endObject();
}).iterator(),
Iterators.single((b, p) -> b.endObject())
ChunkedToXContentHelper.startObject("nodes"),
Iterators.flatMap(getNodes().iterator(), nodeStats -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject(nodeStats.getNode().getId());
builder.field("timestamp", nodeStats.getTimestamp());
return builder;
}), nodeStats.toXContentChunked(outerParams), ChunkedToXContentHelper.endObject())),
ChunkedToXContentHelper.endObject()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;

Expand All @@ -38,8 +39,8 @@ public final Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params
b.startObject();
RestActions.buildNodesHeader(b, p, this);
return b.field("cluster_name", getClusterName().value());
}), xContentChunks(), Iterators.single((ToXContent) (b, p) -> b.endObject()));
}), xContentChunks(params), ChunkedToXContentHelper.endObject());
}

protected abstract Iterator<? extends ToXContent> xContentChunks();
protected abstract Iterator<? extends ToXContent> xContentChunks(ToXContent.Params outerParams);
}
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private void finish() {
if (finished.compareAndSet(false, true)) {
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
if (shardReplicaFailures.isEmpty()) {
failuresArray = ReplicationResponse.EMPTY;
failuresArray = ReplicationResponse.NO_FAILURES;
} else {
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
shardReplicaFailures.toArray(failuresArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public class ReplicationResponse extends ActionResponse {

public static final ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0];
public static final ReplicationResponse.ShardInfo.Failure[] NO_FAILURES = new ReplicationResponse.ShardInfo.Failure[0];

private ShardInfo shardInfo;

Expand Down Expand Up @@ -68,7 +68,7 @@ public static class ShardInfo implements Writeable, ToXContentObject {

private int total;
private int successful;
private Failure[] failures = EMPTY;
private Failure[] failures = ReplicationResponse.NO_FAILURES;

public ShardInfo() {}

Expand Down Expand Up @@ -186,7 +186,7 @@ public static ShardInfo fromXContent(XContentParser parser) throws IOException {
parser.skipChildren(); // skip potential inner arrays for forward compatibility
}
}
Failure[] failures = EMPTY;
Failure[] failures = ReplicationResponse.NO_FAILURES;
if (failuresList != null) {
failures = failuresList.toArray(new Failure[failuresList.size()]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
Expand Down Expand Up @@ -84,4 +85,9 @@ public boolean isFragment() {
default boolean isFragment() {
return true;
}

/**
* A {@link ChunkedToXContent} that yields no chunks
*/
ChunkedToXContent EMPTY = params -> Collections.emptyIterator();
}
28 changes: 16 additions & 12 deletions server/src/main/java/org/elasticsearch/http/HttpStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,20 @@

package org.elasticsearch.http;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class HttpStats implements Writeable, ToXContentFragment {
public class HttpStats implements Writeable, ChunkedToXContent {

private final long serverOpen;
private final long totalOpen;
Expand Down Expand Up @@ -78,17 +82,17 @@ static final class Fields {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.HTTP);
builder.field(Fields.CURRENT_OPEN, serverOpen);
builder.field(Fields.TOTAL_OPENED, totalOpen);
builder.startArray(Fields.CLIENTS);
for (ClientStats clientStats : this.clientStats) {
clientStats.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.<ToXContent>concat(
Iterators.single(
(builder, params) -> builder.startObject(Fields.HTTP)
.field(Fields.CURRENT_OPEN, serverOpen)
.field(Fields.TOTAL_OPENED, totalOpen)
.startArray(Fields.CLIENTS)
),
Iterators.flatMap(clientStats.iterator(), Iterators::<ToXContent>single),
Iterators.single((builder, params) -> builder.endArray().endObject())
);
}

public static class ClientStats implements Writeable, ToXContentFragment {
Expand Down
Loading