Skip to content

Commit

Permalink
HLRC: Don't send defaults for SubmitAsyncSearchRequest (#54200) (#54267)
Browse files Browse the repository at this point in the history
Currently we set the defaults for ccsMinimizeRoundtrips, preFilterShardSize and
requestCache on the HLRC SubmitAsyncSearchRequest in the constructor. This is no
longer needed since we now only send the parameters along with the rest request
that are supported (omitting e.g. ccsMinimizeRoundtrips) and the correct
defaults are set on the client side. This change removes setting and sending
these defaults where possible, leaving only the overwrite of batchedReduceSize
with a default value of 5, since the default used in the vanilla SearchRequest
is 512. However, we don't need to send this value along as a request parameter
if its the default since the correct one will be set on the receiving end if no
value is specified.
Also adding tests for RestSubmitAsyncSearchAction that check the correct
defaults are set when parameters are missing on the server side.

Backport of #54200
  • Loading branch information
Christoph Büscher authored Mar 26, 2020
1 parent e6e27ff commit 3664f10
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ static void addSearchRequestParams(Params params, SubmitAsyncSearchRequest reque
if (request.getAllowPartialSearchResults() != null) {
params.withAllowPartialResults(request.getAllowPartialSearchResults());
}
params.withBatchedReduceSize(request.getBatchedReduceSize());
if (request.getBatchedReduceSize() != null) {
params.withBatchedReduceSize(request.getBatchedReduceSize());
}
}

static Request getAsyncSearch(GetAsyncSearchRequest asyncSearchRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,22 @@
*/
public class SubmitAsyncSearchRequest implements Validatable {

public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 1;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 5;
private static final boolean DEFAULT_CCS_MINIMIZE_ROUNDTRIPS = false;
private static final boolean DEFAULT_REQUEST_CACHE_VALUE = true;

public static long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();

private TimeValue waitForCompletionTimeout;
private Boolean keepOnCompletion;
private TimeValue keepAlive;
private final SearchRequest searchRequest;
// The following is optional and will only be sent down with the request if explicitely set by the user
private Integer batchedReduceSize;

/**
* Creates a new request
*/
public SubmitAsyncSearchRequest(SearchSourceBuilder source, String... indices) {
this.searchRequest = new SearchRequest(indices, source);
searchRequest.setCcsMinimizeRoundtrips(DEFAULT_CCS_MINIMIZE_ROUNDTRIPS);
searchRequest.setPreFilterShardSize(DEFAULT_PRE_FILTER_SHARD_SIZE);
searchRequest.setBatchedReduceSize(DEFAULT_BATCHED_REDUCE_SIZE);
searchRequest.requestCache(DEFAULT_REQUEST_CACHE_VALUE);
}

/**
Expand Down Expand Up @@ -192,33 +187,34 @@ public Boolean getAllowPartialSearchResults() {
}

/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
* Optional. Sets the number of shard results that should be reduced at once on the coordinating node.
* This value should be used as a protection mechanism to reduce the memory overhead per search
* request if the potential number of shards in the request can be large. Defaults to 5.
*/
public void setBatchedReduceSize(int batchedReduceSize) {
this.searchRequest.setBatchedReduceSize(batchedReduceSize);
this.batchedReduceSize = batchedReduceSize;
}

/**
* Gets the number of shard results that should be reduced at once on the coordinating node.
* This defaults to 5 for {@link SubmitAsyncSearchRequest}.
* Returns {@code null} if unset.
*/
public int getBatchedReduceSize() {
return this.searchRequest.getBatchedReduceSize();
public Integer getBatchedReduceSize() {
return this.batchedReduceSize;
}

/**
* Sets if this request should use the request cache or not, assuming that it can (for
* example, if "now" is used, it will never be cached). By default (not set, or null,
* will default to the index level setting if request cache is enabled or not).
* example, if "now" is used, it will never be cached).
* By default (if not set) this is turned on for {@link SubmitAsyncSearchRequest}.
*/
public void setRequestCache(Boolean requestCache) {
this.searchRequest.requestCache(requestCache);
}

/**
* Gets if this request should use the request cache or not.
* Defaults to `true` for {@link SubmitAsyncSearchRequest}.
* Gets if this request should use the request cache or not, if set.
* This defaults to `true` on the server side if unset in the client.
*/
public Boolean getRequestCache() {
return this.searchRequest.requestCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public void testSubmitAsyncSearch() throws Exception {

// the following parameters might be overwritten by random ones later,
// but we need to set these since they are the default we send over http
expectedParams.put("request_cache", "true");
expectedParams.put("batched_reduce_size", "5");
setRandomSearchParams(submitRequest, expectedParams);
setRandomIndicesOptions(submitRequest::setIndicesOptions, submitRequest::getIndicesOptions, expectedParams);

Expand Down Expand Up @@ -108,8 +106,8 @@ private static void setRandomSearchParams(SubmitAsyncSearchRequest request, Map<
}
if (randomBoolean()) {
request.setBatchedReduceSize(randomIntBetween(2, Integer.MAX_VALUE));
expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize()));
}
expectedParams.put("batched_reduce_size", Integer.toString(request.getBatchedReduceSize()));
if (randomBoolean()) {
request.setMaxConcurrentShardRequests(randomIntBetween(1, Integer.MAX_VALUE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.searchType(searchType);
}
parseSearchSource(searchRequest.source(), request, setSize);
searchRequest.requestCache(request.paramAsBoolean("request_cache", null));
searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));

String scroll = request.param("scroll");
if (scroll != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class RestSubmitAsyncSearchActionTests extends ESTestCase {

private RestSubmitAsyncSearchAction action;
private ActionRequest lastCapturedRequest;
private RestController controller;
private NodeClient nodeClient;

@Before
public void setUpController() {
nodeClient = new NodeClient(Settings.EMPTY, null) {

@Override
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,
Request request, ActionListener<Response> listener) {
lastCapturedRequest = request;
return new Task(1L, "type", "action", "description", null, null);
}
};
nodeClient.initialize(new HashMap<>(), () -> "local", null);
controller = new RestController(Collections.emptySet(), null,
nodeClient,
new NoneCircuitBreakerService(),
new UsageService());
action = new RestSubmitAsyncSearchAction();
controller.registerHandler(action);
}

/**
* Check that the appropriate defaults are set on the {@link SubmitAsyncSearchRequest} if
* no parameters are specified on the rest request itself.
*/
public void testRequestParameterDefaults() throws IOException {
RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry())
.withMethod(RestRequest.Method.POST)
.withPath("/test_index/_async_search")
.withContent(new BytesArray("{}"), XContentType.JSON)
.build();
dispatchRequest(submitAsyncRestRequest);
SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest;
assertEquals(TimeValue.timeValueSeconds(1), submitRequest.getWaitForCompletionTimeout());
assertEquals(false, submitRequest.isKeepOnCompletion());
assertEquals(TimeValue.timeValueDays(5), submitRequest.getKeepAlive());
// check parameters we implicitly set in the SubmitAsyncSearchRequest ctor
assertEquals(false, submitRequest.getSearchRequest().isCcsMinimizeRoundtrips());
assertEquals(5, submitRequest.getSearchRequest().getBatchedReduceSize());
assertEquals(true, submitRequest.getSearchRequest().requestCache());
assertEquals(1, submitRequest.getSearchRequest().getPreFilterShardSize().intValue());
}

public void testParameters() throws IOException {
String tvString = randomTimeValue(1, 100);
doTestParameter("keep_alive", tvString, TimeValue.parseTimeValue(tvString, ""), SubmitAsyncSearchRequest::getKeepAlive);
doTestParameter("wait_for_completion_timeout", tvString, TimeValue.parseTimeValue(tvString, ""),
SubmitAsyncSearchRequest::getWaitForCompletionTimeout);
boolean keepOnCompletion = randomBoolean();
doTestParameter("keep_on_completion", Boolean.toString(keepOnCompletion), keepOnCompletion,
SubmitAsyncSearchRequest::isKeepOnCompletion);
boolean requestCache = randomBoolean();
doTestParameter("request_cache", Boolean.toString(requestCache), requestCache,
r -> r.getSearchRequest().requestCache());
Integer batchReduceSize = randomIntBetween(2, 50);
doTestParameter("batched_reduce_size", Integer.toString(batchReduceSize), batchReduceSize,
r -> r.getSearchRequest().getBatchedReduceSize());
}

private <T> void doTestParameter(String paramName, String paramValue, T expectedValue,
Function<SubmitAsyncSearchRequest, T> valueAccessor) {
Map<String, String> params = new HashMap<>();
params.put(paramName, paramValue);
RestRequest submitAsyncRestRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath("/test_index/_async_search")
.withParams(params)
.withContent(new BytesArray("{}"), XContentType.JSON).build();
dispatchRequest(submitAsyncRestRequest);
SubmitAsyncSearchRequest submitRequest = (SubmitAsyncSearchRequest) lastCapturedRequest;
assertEquals(expectedValue, valueAccessor.apply(submitRequest));
}

/**
* Sends the given request to the test controller
*/
protected void dispatchRequest(RestRequest request) {
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
controller.dispatchRequest(request, channel, threadContext);
}
}

0 comments on commit 3664f10

Please sign in to comment.