Skip to content

Commit

Permalink
Set max allowed size for stored async response (#74455)
Browse files Browse the repository at this point in the history
Add a dynamic transient cluster setting search.max_async_search_response_size
that controls the maximum allowed size for a stored async search
response. The default max size is 10Mb. An attempt to store
an async search response larger than this size will result in error.

Relates to #67594
  • Loading branch information
mayya-sharipova authored Jun 30, 2021
1 parent 174f65e commit aa76ebb
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 38 deletions.
5 changes: 5 additions & 0 deletions docs/reference/search/async-search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ nor search requests that only include the <<search-suggesters,suggest section>>.
{ccs-cap} is supported only with
<<ccs-min-roundtrips,`ccs_minimize_roundtrips`>> set to `false`.

WARNING: By default, {es} doesn't allow to store an async search response
larger than 10Mb, and an attempt to do this results in an error. The maximum
allowed size for a stored async search response can be set by changing the
`search.max_async_search_response_size` cluster level setting.

[[get-async-search]]
==== Get async search

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public long ramBytesUsed() {
return bytes.ramBytesUsed();
}

void ensureCapacity(long offset) {
protected void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER,
SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -172,6 +174,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

public static final Setting<ByteSizeValue> MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING = Setting.byteSizeSetting(
"search.max_async_search_response_size",
new ByteSizeValue(10, ByteSizeUnit.MB),
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
Expand All @@ -21,8 +23,10 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;

import java.util.ArrayList;
Expand All @@ -34,6 +38,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
Expand All @@ -60,7 +66,7 @@ public void setupSuiteScopeCluster() throws InterruptedException {
createIndex(indexName, Settings.builder()
.put("index.number_of_shards", numShards)
.build());
numKeywords = randomIntBetween(1, 100);
numKeywords = randomIntBetween(50, 100);
keywordFreqs = new HashMap<>();
Set<String> keywordSet = new HashSet<>();
for (int i = 0; i < numKeywords; i++) {
Expand Down Expand Up @@ -457,4 +463,44 @@ public void testSearchPhaseFailure() throws Exception {
assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId());
}

public void testFinalResponseLargerMaxSize() throws Exception {
SearchSourceBuilder source = new SearchSourceBuilder()
.query(new MatchAllQueryBuilder())
.aggregation(AggregationBuilders.terms("terms").field("terms.keyword").size(numKeywords));

int limit = 1000; // should be enough to store initial response, but not enough for final response
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName);
request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(0));

// initial response – ok
final AsyncSearchResponse initialResponse = submitAsyncSearch(request);
assertTrue(initialResponse.isRunning());
assertNull(initialResponse.getFailure());

// final response – with failure; test that stored async search response is updated with this failure
assertBusy(() -> {
final AsyncSearchResponse finalResponse = client().execute(GetAsyncSearchAction.INSTANCE,
new GetAsyncResultRequest(initialResponse.getId())
.setWaitForCompletionTimeout(TimeValue.timeValueMillis(300))).get();
assertNotNull(finalResponse.getFailure());
assertFalse(finalResponse.isRunning());
if (finalResponse.getFailure() != null) {
assertEquals("Can't store an async search response larger than [" + limit + "] bytes. " +
"This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.",
finalResponse.getFailure().getMessage());
}
});

updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

deleteAsyncSearch(initialResponse.getId());
ensureTaskRemoval(initialResponse.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
*/
package org.elasticsearch.xpack.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
Expand All @@ -27,8 +22,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.tasks.Task;
Expand All @@ -49,8 +42,6 @@
import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;

public class TransportSubmitAsyncSearchAction extends HandledTransportAction<SubmitAsyncSearchRequest, AsyncSearchResponse> {
private static final Logger logger = LogManager.getLogger(TransportSubmitAsyncSearchAction.class);

private final NodeClient nodeClient;
private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
private final TransportSearchAction searchAction;
Expand Down Expand Up @@ -177,21 +168,14 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
private void onFinalResponse(AsyncSearchTask searchTask,
AsyncSearchResponse response,
Runnable nextAction) {
store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
}
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
store.updateResponse(searchTask.getExecutionId().getDocId(),
threadContext.getResponseHeaders(),
response,
ActionListener.wrap(() -> {
taskManager.unregister(searchTask);
nextAction.run();
})
);
}

private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {
taskManager.unregister(searchTask);
nextAction.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,11 @@ public interface AsyncResponse<T extends AsyncResponse<?>> extends Writeable {
*/
T withExpirationTime(long expirationTimeMillis);

/**
* Convert this AsyncResponse to a new AsyncResponse with a given failure
* @return a new AsyncResponse that stores a failure with a provided exception
*/
default T convertToFailure(Exception exc) {
return null;
}
}
Loading

0 comments on commit aa76ebb

Please sign in to comment.