Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support point in time in async search #61560

Merged
merged 1 commit into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public String getName() {
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// Creates the search request with all required params
SearchRequest searchRequest = new SearchRequest();
RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size));
RestSearchAction.parseSearchRequest(
searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size));

// Creates the search template request
SearchTemplateRequest searchTemplateRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
Expand All @@ -49,7 +50,7 @@ protected AbstractBaseReindexRestHandler(A action) {
protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client,
boolean includeCreated, boolean includeUpdated) throws IOException {
// Build the internal request
Request internal = setCommonOptions(request, buildRequest(request));
Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry()));

// Executes the request and waits for completion
if (request.paramAsBoolean("wait_for_completion", true)) {
Expand Down Expand Up @@ -77,7 +78,7 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c
/**
* Build the Request based on the RestRequest.
*/
protected abstract Request buildRequest(RestRequest request) throws IOException;
protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException;

/**
* Sets common options of {@link AbstractBulkByScrollRequest} requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -43,15 +44,15 @@ protected AbstractBulkByQueryRestHandler(A action) {
super(action);
}

protected void parseInternalRequest(Request internal, RestRequest restRequest,
protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry,
Map<String, Consumer<Object>> bodyConsumers) throws IOException {
assert internal != null : "Request should not be null";
assert restRequest != null : "RestRequest should not be null";

SearchRequest searchRequest = internal.getSearchRequest();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> failOnSizeSpecified());
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, namedWriteableRegistry, size -> failOnSizeSpecified());
}

searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest;

import java.io.IOException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException {
protected DeleteByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/*
* Passing the search request through DeleteByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
Expand All @@ -64,7 +65,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);
parseInternalRequest(internal, request, namedWriteableRegistry, consumers);

return internal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestRequest;

Expand Down Expand Up @@ -55,7 +56,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected ReindexRequest buildRequest(RestRequest request) throws IOException {
protected ReindexRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
if (request.hasParam("pipeline")) {
throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
+ "Specify it in the [dest] object instead.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.Script;

Expand Down Expand Up @@ -53,7 +54,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException {
protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/*
* Passing the search request through UpdateByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
Expand All @@ -66,7 +67,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept
consumers.put("script", o -> internal.setScript(Script.parse(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);
parseInternalRequest(internal, request, namedWriteableRegistry, consumers);

internal.setPipeline(request.param("pipeline"));
return internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -29,6 +30,7 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;

import static java.util.Collections.singletonMap;

Expand Down Expand Up @@ -59,7 +61,8 @@ public void testPipelineQueryParameterIsError() throws IOException {
request.withContent(BytesReference.bytes(body), body.contentType());
}
request.withParams(singletonMap("pipeline", "doesn't matter"));
Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build()));
Exception e = expectThrows(IllegalArgumentException.class, () ->
action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList())));

assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage());
}
Expand All @@ -68,14 +71,14 @@ public void testSetScrollTimeout() throws IOException {
{
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
}
{
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withParams(singletonMap("scroll", "10m"));
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals("10m", request.getScrollTime().toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,6 @@ public ActionRequestValidationException validate() {
if (scroll) {
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
}
if (routing() != null) {
validationException = addValidationError("[routing] cannot be used with point in time", validationException);
}
if (preference() != null) {
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
}
}
return validationException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.rest.action.search;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchContextId;
import org.elasticsearch.action.search.SearchRequest;
Expand Down Expand Up @@ -51,6 +53,7 @@
import java.util.Set;
import java.util.function.IntConsumer;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import static org.elasticsearch.rest.RestRequest.Method.POST;
Expand Down Expand Up @@ -100,12 +103,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
* company.
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser -> {
parseSearchRequest(searchRequest, request, parser, setSize);
if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, client.getNamedWriteableRegistry());
}
});
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));

return channel -> {
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
Expand All @@ -122,6 +121,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
*/
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser,
NamedWriteableRegistry namedWriteableRegistry,
IntConsumer setSize) throws IOException {

if (searchRequest.source() == null) {
Expand Down Expand Up @@ -175,6 +175,10 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.setCcsMinimizeRoundtrips(request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));

checkRestTotalHits(request, searchRequest);

if (searchRequest.pointInTimeBuilder() != null) {
preparePointInTime(searchRequest, namedWriteableRegistry);
}
}

/**
Expand Down Expand Up @@ -291,6 +295,21 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil

static void preparePointInTime(SearchRequest request, NamedWriteableRegistry namedWriteableRegistry) {
assert request.pointInTimeBuilder() != null;
ActionRequestValidationException validationException = null;
if (request.indices().length > 0) {
validationException = addValidationError("[indices] cannot be used with point in time", validationException);
}
if (request.indicesOptions() != SearchRequest.DEFAULT_INDICES_OPTIONS) {
validationException = addValidationError("[indicesOptions] cannot be used with point in time", validationException);
}
if (request.routing() != null) {
validationException = addValidationError("[routing] cannot be used with point in time", validationException);
}
if (request.preference() != null) {
validationException = addValidationError("[preference] cannot be used with point in time", validationException);
}
ExceptionsHelper.reThrowIfNotNull(validationException);

final IndicesOptions indicesOptions = request.indicesOptions();
final IndicesOptions stricterIndicesOptions = IndicesOptions.fromOptions(
indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,6 @@ public void testValidate() throws IOException {
assertEquals(1, validationErrors.validationErrors().size());
assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0));
}
{
// Reader context with preference
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder().
pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
.preference("test");
ActionRequestValidationException validationErrors = searchRequest.validate();
assertNotNull(validationErrors);
assertEquals(1, validationErrors.validationErrors().size());
assertEquals("[preference] cannot be used with point in time", validationErrors.validationErrors().get(0));
}
{
// Reader context with routing
SearchRequest searchRequest = new SearchRequest()
.source(new SearchSourceBuilder()
.pointInTimeBuilder(new SearchSourceBuilder.PointInTimeBuilder("id", TimeValue.timeValueMillis(between(1, 10)))))
.routing("test");
ActionRequestValidationException validationErrors = searchRequest.validate();
assertNotNull(validationErrors);
assertEquals(1, validationErrors.validationErrors().size());
assertEquals("[routing] cannot be used with point in time", validationErrors.validationErrors().get(0));
}
}

public void testCopyConstructor() throws IOException {
Expand Down
Loading