Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added soft limit to open scroll contexts #25244 #36009

Merged
merged 4 commits into from
Dec 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/search/request/scroll.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ TIP: Keeping older segments alive means that more file handles are needed.
Ensure that you have configured your nodes to have ample free file handles.
See <<file-descriptors>>.

NOTE: To prevent against issues caused by having too many scrolls open, the
user is not allowed to open scrolls past a certain limit. By default, the
maximum number of open scrolls is 500. This limit can be updated with the
`search.max_open_scroll_context` cluster setting.

You can check how many search contexts are open with the
<<cluster-nodes-stats,nodes stats API>>:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.MAX_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_DATA_SETTING,
Expand Down
23 changes: 23 additions & 0 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
Expand Down Expand Up @@ -145,6 +146,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
public static final Setting<Boolean> DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS =
Setting.boolSetting("search.default_allow_partial_results", true, Property.Dynamic, Property.NodeScope);

public static final Setting<Integer> MAX_OPEN_SCROLL_CONTEXT =
Setting.intSetting("search.max_open_scroll_context", 500, 0, Property.Dynamic, Property.NodeScope);


private final ThreadPool threadPool;

Expand Down Expand Up @@ -174,6 +178,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile boolean lowLevelCancellation;

private volatile int maxOpenScrollContext;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
Expand All @@ -182,6 +188,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final MultiBucketConsumerService multiBucketConsumerService;

private final AtomicInteger openScrollContexts = new AtomicInteger();

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
Expand Down Expand Up @@ -212,6 +220,8 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
clusterService.getClusterSettings().addSettingsUpdateConsumer(DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
this::setDefaultAllowPartialSearchResults);

maxOpenScrollContext = MAX_OPEN_SCROLL_CONTEXT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_SCROLL_CONTEXT, this::setMaxOpenScrollContext);

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
Expand Down Expand Up @@ -243,6 +253,10 @@ public boolean defaultAllowPartialSearchResults() {
return defaultAllowPartialSearchResults;
}

private void setMaxOpenScrollContext(int maxOpenScrollContext) {
this.maxOpenScrollContext = maxOpenScrollContext;
}

private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}
Expand Down Expand Up @@ -592,11 +606,19 @@ private SearchContext findContext(long id, TransportRequest request) throws Sear
}

final SearchContext createAndPutContext(ShardSearchRequest request) throws IOException {
if (request.scroll() != null && openScrollContexts.get() >= maxOpenScrollContext) {
throw new ElasticsearchException(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
maxOpenScrollContext + "]. " + "This limit can be set by changing the ["
+ MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting.");
}

SearchContext context = createContext(request);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
Expand Down Expand Up @@ -696,6 +718,7 @@ public boolean freeContext(long id) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also check the scroll contexts in freeAllContextForIndex above ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't freeAllContextForIndex simply call freeContext ? Am I supposed to do any extra processing here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my bad, it does call freeContext

context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
Expand Down Expand Up @@ -75,6 +77,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -416,6 +419,44 @@ searchSourceBuilder, new String[0], false, new AliasFilter(null, Strings.EMPTY_A
}
}

/**
* test that creating more than the allowed number of scroll contexts throws an exception
*/
public void testMaxOpenScrollContexts() throws RuntimeException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();

final SearchService service = getInstanceFromNode(SearchService.class);
final IndicesService indicesService = getInstanceFromNode(IndicesService.class);
final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
final IndexShard indexShard = indexService.getShard(0);

// Open all possible scrolls, clear some of them, then open more until the limit is reached
LinkedList<String> clearScrollIds = new LinkedList<>();

for (int i = 0; i < SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY); i++) {
SearchResponse searchResponse = client().prepareSearch("index").setSize(1).setScroll("1m").get();

if (randomInt(4) == 0) clearScrollIds.addLast(searchResponse.getScrollId());
}

ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.setScrollIds(clearScrollIds);
client().clearScroll(clearScrollRequest);

for (int i = 0; i < clearScrollIds.size(); i++) {
client().prepareSearch("index").setSize(1).setScroll("1m").get();
}

ElasticsearchException ex = expectThrows(ElasticsearchException.class,
() -> service.createAndPutContext(new ShardScrollRequestTest(indexShard.shardId())));
assertEquals(
"Trying to create too many scroll contexts. Must be less than or equal to: [" +
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
"This limit can be set by changing the [search.max_open_scroll_context] setting.",
ex.getMessage());
}

public static class FailOnRewriteQueryPlugin extends Plugin implements SearchPlugin {
@Override
public List<QuerySpec<?>> getQueries() {
Expand Down Expand Up @@ -471,6 +512,22 @@ public String getWriteableName() {
}
}

public static class ShardScrollRequestTest extends ShardSearchLocalRequest {
private Scroll scroll;

ShardScrollRequestTest(ShardId shardId) {
super(shardId, 1, SearchType.DEFAULT, new SearchSourceBuilder(),
new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, true, null, null);

this.scroll = new Scroll(TimeValue.timeValueMinutes(1));
}

@Override
public Scroll scroll() {
return this.scroll;
}
}

public void testCanMatch() throws IOException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
Expand Down