Skip to content

Commit

Permalink
Ensure search contexts are removed on index delete (elastic#56335)
Browse files Browse the repository at this point in the history
In a race condition, a search context could remain enlisted in
SearchService when an index is deleted, potentially causing the index
folder to not be cleaned up (for either lengthy searches or scrolls with
timeouts > 30 minutes or if the scroll is kept active).
  • Loading branch information
henningandersen authored May 12, 2020
1 parent 3ebbf89 commit a52c85a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand Down Expand Up @@ -634,6 +634,9 @@ final SearchContext createAndPutContext(ShardSearchRequest request, SearchShardT
boolean success = false;
try {
putContext(context);
// ensure that if we race against afterIndexRemoved, we free the context here.
// this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
indicesService.indexServiceSafe(request.shardId().getIndex());
success = true;
return context;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand Down Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.AbstractQueryBuilder;
Expand Down Expand Up @@ -122,7 +123,7 @@ protected boolean resetNodeAfterTest() {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class,
ReaderWrapperCountPlugin.class, InternalOrPrivateSettingsPlugin.class);
ReaderWrapperCountPlugin.class, InternalOrPrivateSettingsPlugin.class, MockSearchService.TestPlugin.class);
}

public static class ReaderWrapperCountPlugin extends Plugin {
Expand Down Expand Up @@ -329,6 +330,7 @@ public void onFailure(Exception e) {
service.executeFetchPhase(req, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
// have to free context since this test does not remove the index from IndicesService.
service.freeContext(searchPhaseResult.getContextId());
}
} catch (ExecutionException ex) {
Expand Down Expand Up @@ -357,6 +359,53 @@ public void onFailure(Exception e) {
assertEquals(0, totalStats.getFetchCurrent());
}

public void testSearchWhileIndexDeletedDoesNotLeakSearchContext() throws ExecutionException, InterruptedException {
createIndex("index");
client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();

IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
IndexShard indexShard = indexService.getShard(0);

MockSearchService service = (MockSearchService) getInstanceFromNode(SearchService.class);
service.setOnPutContext(
context -> {
if (context.indexShard() == indexShard) {
assertAcked(client().admin().indices().prepareDelete("index"));
}
}
);

SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));

// the scrolls are not explicitly freed, but should all be gone when the test finished.
// for completeness, we also randomly test the regular search path.
final boolean useScroll = randomBoolean();
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
service.executeQueryPhase(
new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest,
new ShardId(resolveIndex("index"), 0), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result);

try {
result.get();
} catch (Exception e) {
// ok
}

expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices("index").get());

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
Expand Down Expand Up @@ -527,6 +576,8 @@ public void testMaxOpenScrollContexts() throws RuntimeException {
SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " +
"This limit can be set by changing the [search.max_open_scroll_context] setting.",
ex.getMessage());

service.freeAllScrollContexts();
}

public void testOpenScrollContextsConcurrently() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
Expand All @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class MockSearchService extends SearchService {
/**
Expand All @@ -42,6 +43,8 @@ public static class TestPlugin extends Plugin {}

private static final Map<SearchContext, Throwable> ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>();

private Consumer<SearchContext> onPutContext = context -> {};

/** Throw an {@link AssertionError} if there are still in-flight contexts. */
public static void assertNoInFlightContext() {
final Map<SearchContext, Throwable> copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS);
Expand Down Expand Up @@ -75,6 +78,7 @@ public MockSearchService(ClusterService clusterService,

@Override
protected void putContext(SearchContext context) {
onPutContext.accept(context);
addActiveContext(context);
super.putContext(context);
}
Expand All @@ -87,4 +91,8 @@ protected SearchContext removeContext(long id) {
}
return removed;
}

public void setOnPutContext(Consumer<SearchContext> onPutContext) {
this.onPutContext = onPutContext;
}
}

0 comments on commit a52c85a

Please sign in to comment.