Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XContent chunking to SearchResponse #94736

Merged
merged 13 commits into from
May 12, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestStatusToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.util.List;

Expand All @@ -38,6 +38,6 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
SearchRequest searchRequest = new SearchRequest();
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
return channel -> client.execute(NoopSearchAction.INSTANCE, searchRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -133,7 +134,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

if (searchResponse != null) {
builder.field("response");
searchResponse.toXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(searchResponse).toXContent(builder, params);
}
if (error != null) {
builder.startObject("error");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -101,16 +102,15 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
public void innerToXContent(XContentBuilder builder, Params params) throws IOException {
cbuescher marked this conversation as resolved.
Show resolved Hide resolved
if (hasResponse()) {
response.innerToXContent(builder, params);
ChunkedToXContent.wrapAsToXContent(p -> response.innerToXContentChunked(p)).toXContent(builder, params);
} else {
// we can assume the template is always json as we convert it before compiling it
try (InputStream stream = source.streamInput()) {
builder.rawField(TEMPLATE_OUTPUT_FIELD.getPreferredName(), stream, XContentType.JSON);
}
}
return builder;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

Expand All @@ -34,7 +37,7 @@
/**
* A multi search response.
*/
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ToXContentObject {
public class MultiSearchResponse extends ActionResponse implements Iterable<MultiSearchResponse.Item>, ChunkedToXContentObject {

private static final ParseField RESPONSES = new ParseField(Fields.RESPONSES);
private static final ParseField TOOK_IN_MILLIS = new ParseField("took");
Expand All @@ -52,7 +55,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
/**
* A search response item, holding the actual search response, or an error message if it failed.
*/
public static class Item implements Writeable {
public static class Item implements Writeable, ChunkedToXContent {
private final SearchResponse response;
private final Exception exception;

Expand Down Expand Up @@ -82,6 +85,26 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
if (isFailure()) {
return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> {
ElasticsearchException.generateFailureXContent(b, p, Item.this.getFailure(), true);
return b;
}),
Iterators.single((b, p) -> b.field(Fields.STATUS, ExceptionsHelper.status(Item.this.getFailure()).getStatus())),
ChunkedToXContentHelper.endObject()
cbuescher marked this conversation as resolved.
Show resolved Hide resolved
);
} else {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Item.this.getResponse().innerToXContentChunked(params),
Iterators.single((b, p) -> b.field(Fields.STATUS, Item.this.getResponse().status().getStatus())),
ChunkedToXContentHelper.endObject()
);
}
}

/**
* Is it a failed search?
*/
Expand Down Expand Up @@ -150,24 +173,14 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("took", tookInMillis);
builder.startArray(Fields.RESPONSES);
for (Item item : items) {
builder.startObject();
if (item.isFailure()) {
ElasticsearchException.generateFailureXContent(builder, params, item.getFailure(), true);
builder.field(Fields.STATUS, ExceptionsHelper.status(item.getFailure()).getStatus());
} else {
item.getResponse().innerToXContent(builder, params);
builder.field(Fields.STATUS, item.getResponse().status().getStatus());
}
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
Iterators.single((b, p) -> b.field("took", tookInMillis).startArray(Fields.RESPONSES)),
Iterators.flatMap(Arrays.stream(items).iterator(), item -> item.toXContentChunked(params)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a nicer way to include an array of objects which themselves implement ChunkedToXContent but I couldn't find one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fine IMO but there's no need for the intermediate stream:

Suggested change
Iterators.flatMap(Arrays.stream(items).iterator(), item -> item.toXContentChunked(params)),
Iterators.flatMap(Iterators.forArray(items), item -> item.toXContentChunked(params)),

Iterators.single((b, p) -> b.endArray()),
ChunkedToXContentHelper.endObject()
);
}

public static MultiSearchResponse fromXContext(XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
Expand All @@ -29,13 +31,15 @@
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParser.Token;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -47,7 +51,7 @@
/**
* A response of a search request.
*/
public class SearchResponse extends ActionResponse implements StatusToXContentObject {
public class SearchResponse extends ActionResponse implements ChunkedToXContentObject {

private static final ParseField SCROLL_ID = new ParseField("_scroll_id");
private static final ParseField POINT_IN_TIME_ID = new ParseField("pit_id");
Expand Down Expand Up @@ -129,7 +133,7 @@ public SearchResponse(
: "SearchResponse can't have both scrollId [" + scrollId + "] and searchContextId [" + pointInTimeId + "]";
}

@Override
// @Override
cbuescher marked this conversation as resolved.
Show resolved Hide resolved
public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}
Expand Down Expand Up @@ -264,14 +268,23 @@ public Clusters getClusters() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
innerToXContent(builder, params);
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.startObject(),
this.innerToXContentChunked(params),
ChunkedToXContentHelper.endObject()
);
}

public Iterator<? extends ToXContent> innerToXContentChunked(ToXContent.Params params) {
return Iterators.concat(
ChunkedToXContentHelper.singleChunk(SearchResponse.this::headerToXContent),
Iterators.single(clusters),
Iterators.flatMap(Iterators.single(internalResponse), r -> r.toXContentChunked(params))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly I feel there ought to be a nicer way of doing this but I couldn't find one...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're looking for this:

Suggested change
Iterators.flatMap(Iterators.single(internalResponse), r -> r.toXContentChunked(params))
internalResponse.toXContentChunked(params)

);
}

public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
public XContentBuilder headerToXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (scrollId != null) {
builder.field(SCROLL_ID.getPreferredName(), scrollId);
}
Expand All @@ -295,8 +308,6 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
getFailedShards(),
getShardFailures()
);
clusters.toXContent(builder, params);
internalResponse.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@

package org.elasticsearch.action.search;

import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.profile.SearchProfileResults;
import org.elasticsearch.search.profile.SearchProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;

/**
Expand All @@ -29,7 +31,7 @@
* to parse aggregations into, which are not serializable. This is the common part that can be
* shared between core and client.
*/
public class SearchResponseSections implements ToXContentFragment {
public class SearchResponseSections implements ChunkedToXContent {

protected final SearchHits hits;
protected final Aggregations aggregations;
Expand Down Expand Up @@ -98,18 +100,33 @@ public final Map<String, SearchProfileShardResult> profile() {
}

@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
hits.toXContent(builder, params);
if (aggregations != null) {
aggregations.toXContent(builder, params);
}
if (suggest != null) {
suggest.toXContent(builder, params);
}
if (profileResults != null) {
profileResults.toXContent(builder, params);
}
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(
Iterators.flatMap(Iterators.single(hits), r -> r.toXContentChunked(params)),
Iterators.single((ToXContent) (b, p) -> {
if (aggregations != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why moving these checks inside the Iterator helps enumerating everything in one concat call, but this way we create iterators the we already know will be a noop. Don't know if its worth pulling these out. maybe readability suffers then, leave it up to you to decide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I went back and forth on this a bit, but I think this is the most readable way of doing it, and creating no-op iterators is pretty low cost.

aggregations.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (suggest != null) {
suggest.toXContent(b, p);
}
return b;
}),
Iterators.single((b, p) -> {
if (profileResults != null) {
profileResults.toXContent(b, p);
}
return b;
})
);
}

@Override
public boolean isFragment() {
return true;
}

protected void writeTo(StreamOutput out) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
*/
package org.elasticsearch.common.xcontent;

import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;

import java.util.Iterator;

/**
* Chunked equivalent of {@link org.elasticsearch.xcontent.ToXContentObject} that serializes as a full object.
*/
Expand All @@ -16,4 +21,20 @@ public interface ChunkedToXContentObject extends ChunkedToXContent {
default boolean isFragment() {
return false;
}

/**
* Wraps the given instance in a {@link ToXContentObject} that will fully serialize the instance when serialized.
*
* @param chunkedToXContent instance to wrap
* @return x-content instance
*/
static ToXContentObject wrapAsToXContentObject(ChunkedToXContentObject chunkedToXContent) {
return (builder, params) -> {
Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
while (serialization.hasNext()) {
serialization.next().toXContent(builder, params);
}
return builder;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,16 @@ public static BytesReference toXContent(ToXContent toXContent, XContentType xCon
return toXContent(toXContent, xContentType, ToXContent.EMPTY_PARAMS, humanReadable);
}

/**
* Returns the bytes that represent the XContent output of the provided {@link ChunkedToXContent} object, using the provided
* {@link XContentType}. Wraps the output into a new anonymous object according to the value returned
* by the {@link ToXContent#isFragment()} method returns.
*/
public static BytesReference toXContent(ChunkedToXContent toXContent, XContentType xContentType, boolean humanReadable)
throws IOException {
return toXContent(ChunkedToXContent.wrapAsToXContent(toXContent), xContentType, humanReadable);
}

/**
* Returns the bytes that represent the XContent output of the provided {@link ToXContent} object, using the provided
* {@link XContentType}. Wraps the output into a new anonymous object according to the value returned
Expand All @@ -516,6 +526,16 @@ public static BytesReference toXContent(ToXContent toXContent, XContentType xCon
}
}

/**
* Returns the bytes that represent the XContent output of the provided {@link ChunkedToXContent} object, using the provided
* {@link XContentType}. Wraps the output into a new anonymous object according to the value returned
* by the {@link ToXContent#isFragment()} method returns.
*/
public static BytesReference toXContent(ChunkedToXContent toXContent, XContentType xContentType, Params params, boolean humanReadable)
throws IOException {
return toXContent(ChunkedToXContent.wrapAsToXContent(toXContent), xContentType, params, humanReadable);
}

/**
* Guesses the content type based on the provided bytes which may be compressed.
*
Expand Down
Loading