Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XContent chunking to SearchResponse #94736

Merged
merged 13 commits into from
May 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.util.List;

Expand All @@ -38,6 +38,6 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
SearchRequest searchRequest = new SearchRequest();
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -133,7 +134,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

if (searchResponse != null) {
builder.field("response");
searchResponse.toXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(searchResponse).toXContent(builder, params);
}
if (error != null) {
builder.startObject("error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -101,16 +102,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
void innerToXContent(XContentBuilder builder, Params params) throws IOException {
if (hasResponse()) {
response.innerToXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(p -> response.innerToXContentChunked(p)).toXContent(builder, params);
} else {
// we can assume the template is always json as we convert it before compiling it
try (InputStream stream = source.streamInput()) {
builder.rawField(TEMPLATE_OUTPUT_FIELD.getPreferredName(), stream, XContentType.JSON);
}
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,12 +570,11 @@ public static void generateThrowableXContent(XContentBuilder builder, Params par
* This method is usually used when the {@link Exception} is rendered as a full XContent object, and its output can be parsed
* by the {@link #failureFromXContent(XContentParser)} method.
*/
public static void generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed)
public static XContentBuilder generateFailureXContent(XContentBuilder builder, Params params, @Nullable Exception e, boolean detailed)
throws IOException {
// No exception to render as an error
if (e == null) {
builder.field(ERROR, "unknown");
return;
return builder.field(ERROR, "unknown");
}

// Render the exception with a simple message
Expand All @@ -589,8 +588,7 @@ public static void generateFailureXContent(XContentBuilder builder, Params param
}
t = t.getCause();
}
builder.field(ERROR, message);
return;
return builder.field(ERROR, message);
}

// Render the exception with all details
Expand All @@ -606,7 +604,7 @@ public static void generateFailureXContent(XContentBuilder builder, Params param
builder.endArray();
}
generateThrowableXContent(builder, params, e);
builder.endObject();
return builder.endObject();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

Expand All @@ -34,7 +36,7 @@
/**
* A multi search response.
*/
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ToXContentObject {
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ChunkedToXContentObject {

private static final ParseField RESPONSES = new ParseField(Fields.RESPONSES);
private static final ParseField TOOK_IN_MILLIS = new ParseField("took");
Expand All @@ -52,7 +54,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
/**
* A search response item, holding the actual search response, or an error message if it failed.
*/
public static class Item implements Writeable {
public static class Item implements Writeable, ChunkedToXContent {
private final SearchResponse response;
private final Exception exception;

Expand Down Expand Up @@ -82,6 +84,25 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
if (isFailure()) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> ElasticsearchException.generateFailureXContent(b, p, Item.this.getFailure(), true)),
Iterators.single((b, p) -> b.field(Fields.STATUS, ExceptionsHelper.status(Item.this.getFailure()).getStatus())),
ChunkedToXContentHelper.endObject()
cbuescher marked this conversation as resolved.
Show resolved Hide resolved
);
} else {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Item.this.getResponse().innerToXContentChunked(params),
Iterators.single((b, p) -> b.field(Fields.STATUS, Item.this.getResponse().status().getStatus())),
ChunkedToXContentHelper.endObject()
);
}
}

/**
* Is it a failed search?
*/
Expand Down Expand Up @@ -150,24 +171,14 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
if (item.isFailure()) {
ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true);
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().innerToXContent(builder, params);
builder.field(Fields.STATUS, item.getResponse().status().getStatus());
}
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> b.field("took", tookInMillis).startArray(Fields.RESPONSES)),
Iterators.flatMap(Iterators.forArray(items), item -> item.toXContentChunked(params)),
Iterators.single((b, p) -> b.endArray()),
ChunkedToXContentHelper.endObject()
);
}

public static MultiSearchResponse fromXContext(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -29,13 +31,15 @@
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -47,7 +51,7 @@
/**
* A response of a search request.
*/
public class SearchResponse extends ActionResponse implements StatusToXContentObject {
public class SearchResponse extends ActionResponse implements ChunkedToXContentObject {

private static final ParseField SCROLL_ID = new ParseField("_scroll_id");
private static final ParseField POINT_IN_TIME_ID = new ParseField("pit_id");
Expand Down Expand Up @@ -129,7 +133,6 @@ public SearchResponse(
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
}

@Override
public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}
Expand Down Expand Up @@ -264,14 +267,23 @@ public Clusters getClusters() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
this.innerToXContentChunked(params),
ChunkedToXContentHelper.endObject()
);
}

public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.singleChunk(SearchResponse.this::headerToXContent),
Iterators.single(clusters),
internalResponse.toXContentChunked(params)
);
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (scrollId != null) {
builder.field(SCROLL_ID.getPreferredName(), scrollId);
}
Expand All @@ -295,8 +307,6 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
getFailedShards(),
getShardFailures()
);
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

package org.elasticsearch.action.search;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;

/**
Expand All @@ -29,7 +31,7 @@
* to parse aggregations into, which are not serializable. This is the common part that can be
* shared between core and client.
*/
public class SearchResponseSections implements ToXContentFragment {
public class SearchResponseSections implements ChunkedToXContent {

protected final SearchHits hits;
protected final Aggregations aggregations;
Expand Down Expand Up @@ -98,18 +100,33 @@ public final Map<String, SearchProfileShardResult> profile() {
}

@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
if (aggregations != null) {
aggregations.toXContent(builder, params);
}
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (profileResults != null) {
profileResults.toXContent(builder, params);
}
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
Iterators.flatMap(Iterators.single(hits), r -> r.toXContentChunked(params)),
Iterators.single((ToXContent) (b, p) -> {
if (aggregations != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why moving these checks inside the Iterator helps enumerating everything in one concat call, but this way we create iterators the we already know will be a noop. Don't know if its worth pulling these out. maybe readability suffers then, leave it up to you to decide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I went back and forth on this a bit, but I think this is the most readable way of doing it, and creating no-op iterators is pretty low cost.

aggregations.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (suggest != null) {
suggest.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (profileResults != null) {
profileResults.toXContent(b, p);
}
return b;
})
);
}

@Override
public boolean isFragment() {
return true;
}

protected void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
*/
package org.elasticsearch.common.xcontent;

import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;

import java.util.Iterator;

/**
* Chunked equivalent of {@link org.elasticsearch.xcontent.ToXContentObject} that serializes as a full object.
*/
Expand All @@ -16,4 +21,20 @@ public interface ChunkedToXContentObject extends ChunkedToXContent {
default boolean isFragment() {
return false;
}

/**
* Wraps the given instance in a {@link ToXContentObject} that will fully serialize the instance when serialized.
*
* @param chunkedToXContent instance to wrap
* @return x-content instance
*/
static ToXContentObject wrapAsToXContentObject(ChunkedToXContentObject chunkedToXContent) {
return (builder, params) -> {
Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
while (serialization.hasNext()) {
serialization.next().toXContent(builder, params);
}
return builder;
};
}
}
Loading