Skip to content

Commit

Permalink
msearch: Cap the number of searches the msearch api will concurrently…
Browse files Browse the repository at this point in the history
… execute

By default the number of searches msearch executes is capped by the number of
nodes multiplied with the default size of the search threadpool. This default can be
overwritten by using the newly added `max_concurrent_searches` parameter.

Before the msearch api would concurrently execute all searches concurrently. If many large
msearch requests would be executed this could lead to some searches being rejected
while other searches in the msearch request would succeed.

The goal of this change is to avoid this exhausting of the search TP.

Closes #17926
  • Loading branch information
martijnvg committed Jun 13, 2016
1 parent 7c8eb18 commit 3b96055
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
*/
public class MultiSearchRequest extends ActionRequest<MultiSearchRequest> implements CompositeIndicesRequest {

private int maxConcurrentSearchRequests = 0;
private List<SearchRequest> requests = new ArrayList<>();

private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
Expand All @@ -60,6 +61,25 @@ public MultiSearchRequest add(SearchRequest request) {
return this;
}

/**
* Returns the amount of search requests specified in this multi search requests are allowed to be ran concurrently.
*/
public int maxConcurrentSearchRequests() {
return maxConcurrentSearchRequests;
}

/**
* Sets how many search requests specified in this multi search requests are allowed to be ran concurrently.
*/
public MultiSearchRequest maxConcurrentSearchRequests(int maxConcurrentSearchRequests) {
if (maxConcurrentSearchRequests < 1) {
throw new IllegalArgumentException("maxConcurrentSearchRequests must be positive");
}

this.maxConcurrentSearchRequests = maxConcurrentSearchRequests;
return this;
}

public List<SearchRequest> requests() {
return this.requests;
}
Expand Down Expand Up @@ -100,6 +120,7 @@ public MultiSearchRequest indicesOptions(IndicesOptions indicesOptions) {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
maxConcurrentSearchRequests = in.readVInt();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
SearchRequest request = new SearchRequest();
Expand All @@ -111,6 +132,7 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(maxConcurrentSearchRequests);
out.writeVInt(requests.size());
for (SearchRequest request : requests) {
request.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ public MultiSearchRequestBuilder setIndicesOptions(IndicesOptions indicesOptions
request().indicesOptions(indicesOptions);
return this;
}

/**
* Sets how many search requests specified in this multi search requests are allowed to be ran concurrently.
*/
public MultiSearchRequestBuilder setMaxConcurrentSearchRequests(int maxConcurrentSearchRequests) {
request().maxConcurrentSearchRequests(maxConcurrentSearchRequests);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,126 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
*/
public class TransportMultiSearchAction extends HandledTransportAction<MultiSearchRequest, MultiSearchResponse> {

private final int availableProcessors;
private final ClusterService clusterService;
private final TransportSearchAction searchAction;
private final TransportAction<SearchRequest, SearchResponse> searchAction;

@Inject
public TransportMultiSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService,
ClusterService clusterService, TransportSearchAction searchAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ClusterService clusterService, TransportSearchAction searchAction,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = EsExecutors.boundedNumberOfProcessors(settings);
}

// For testing only:
TransportMultiSearchAction(ThreadPool threadPool, ActionFilters actionFilters, TransportService transportService,
ClusterService clusterService, TransportAction<SearchRequest, SearchResponse> searchAction,
IndexNameExpressionResolver indexNameExpressionResolver, int availableProcessors) {
super(Settings.EMPTY, MultiSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, MultiSearchRequest::new);
this.clusterService = clusterService;
this.searchAction = searchAction;
this.availableProcessors = availableProcessors;
}

@Override
protected void doExecute(final MultiSearchRequest request, final ActionListener<MultiSearchResponse> listener) {
protected void doExecute(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
ClusterState clusterState = clusterService.state();
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

final AtomicArray<MultiSearchResponse.Item> responses = new AtomicArray<>(request.requests().size());
final AtomicInteger counter = new AtomicInteger(responses.length());
for (int i = 0; i < responses.length(); i++) {
final int index = i;
searchAction.execute(request.requests().get(i), new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
responses.set(index, new MultiSearchResponse.Item(searchResponse, null));
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
int maxConcurrentSearches = request.maxConcurrentSearchRequests();
if (maxConcurrentSearches == 0) {
maxConcurrentSearches = defaultMaxConcurrentSearches(availableProcessors, clusterState);
}

@Override
public void onFailure(Throwable e) {
responses.set(index, new MultiSearchResponse.Item(null, e));
if (counter.decrementAndGet() == 0) {
finishHim();
}
}
Queue<SearchRequestSlot> searchRequestSlots = new ConcurrentLinkedQueue<>();
for (int i = 0; i < request.requests().size(); i++) {
SearchRequest searchRequest = request.requests().get(i);
searchRequestSlots.add(new SearchRequestSlot(searchRequest, i));
}

private void finishHim() {
int numRequests = request.requests().size();
final AtomicArray<MultiSearchResponse.Item> responses = new AtomicArray<>(numRequests);
final AtomicInteger responseCounter = new AtomicInteger(numRequests);
int numConcurrentSearches = Math.min(numRequests, maxConcurrentSearches);
for (int i = 0; i < numConcurrentSearches; i++) {
executeSearch(searchRequestSlots, responses, responseCounter, listener);
}
}

/*
* This is not perfect and makes a big assumption, that all nodes have the same thread pool size / have the number
* of processors and that shard of the indices the search requests go to are more or less evenly distributed across
* all nodes in the cluster. But I think it is a good enough default for most cases, if not then the default should be
* overwritten in the request itself.
*/
static int defaultMaxConcurrentSearches(int availableProcessors, ClusterState state) {
int numDateNodes = state.getNodes().getDataNodes().size();
// availableProcessors will never be larger than 32, so max defaultMaxConcurrentSearches will never be larger than 49,
// but we don't know about about other search requests that are being executed so lets cap at 10 per node
int defaultSearchThreadPoolSize = Math.min(ThreadPool.searchThreadPoolSize(availableProcessors), 10);
return Math.max(1, numDateNodes * defaultSearchThreadPoolSize);
}

void executeSearch(Queue<SearchRequestSlot> requests, AtomicArray<MultiSearchResponse.Item> responses,
AtomicInteger responseCounter, ActionListener<MultiSearchResponse> listener) {
SearchRequestSlot request = requests.poll();
if (request == null) {
// Ok... so there're no more requests then this is ok, we're then waiting for running requests to complete
return;
}
searchAction.execute(request.request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(searchResponse, null));
handleResponse();
}

@Override
public void onFailure(Throwable e) {
responses.set(request.responseSlot, new MultiSearchResponse.Item(null, e));
handleResponse();
}

private void handleResponse() {
if (responseCounter.decrementAndGet() == 0) {
listener.onResponse(new MultiSearchResponse(responses.toArray(new MultiSearchResponse.Item[responses.length()])));
} else {
executeSearch(requests, responses, responseCounter, listener);
}
});
}
});
}

final static class SearchRequestSlot {

final SearchRequest request;
final int responseSlot;

SearchRequestSlot(SearchRequest request, int responseSlot) {
this.request = request;
this.responseSlot = responseSlot;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public RestMultiSearchAction(Settings settings, RestController controller, Clien
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
if (request.hasParam("max_concurrent_searches")) {
multiSearchRequest.maxConcurrentSearchRequests(request.paramAsInt("max_concurrent_searches", 0));
}

String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
String[] types = Strings.splitStringByCommaToArray(request.param("type"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200));
builders.put(Names.BULK, new FixedExecutorBuilder(settings, Names.BULK, availableProcessors, 50));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, ((availableProcessors * 3) / 2) + 1, 1000));
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
Expand Down Expand Up @@ -389,6 +389,10 @@ static int twiceNumberOfProcessors(int numberOfProcessors) {
return boundedBy(2 * numberOfProcessors, 2, Integer.MAX_VALUE);
}

public static int searchThreadPoolSize(int availableProcessors) {
return ((availableProcessors * 3) / 2) + 1;
}

class LoggingRunnable implements Runnable {

private final Runnable runnable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -37,7 +35,6 @@

import java.io.IOException;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -167,6 +164,13 @@ public void testResponseErrorToXContent() throws IOException {
builder.string());
}

public void testMaxConcurrentSearchRequests() {
MultiSearchRequest request = new MultiSearchRequest();
request.maxConcurrentSearchRequests(randomIntBetween(1, Integer.MAX_VALUE));
expectThrows(IllegalArgumentException.class, () ->
request.maxConcurrentSearchRequests(randomIntBetween(Integer.MIN_VALUE, 0)));
}

private IndicesQueriesRegistry registry() {
IndicesQueriesRegistry registry = new IndicesQueriesRegistry();
QueryParser<MatchAllQueryBuilder> parser = MatchAllQueryBuilder::fromXContent;
Expand Down
Loading

0 comments on commit 3b96055

Please sign in to comment.