Skip to content

Commit

Permalink
Ensure search contexts are removed on index delete (elastic#56335) (e…
Browse files Browse the repository at this point in the history
…lastic#56617)

In a race condition, a search context could remain enlisted in
SearchService when an index is deleted, potentially causing the index
folder to not be cleaned up (for either lengthy searches or scrolls with
timeouts > 30 minutes or if the scroll is kept active).
  • Loading branch information
henningandersen committed May 14, 2020
1 parent dec7c21 commit 6bf110a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,9 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
boolean success = false;
try {
putContext(context);
// ensure that if we race against afterIndexRemoved, we free the context here.
// this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
indicesService.indexServiceSafe(request.shardId().getIndex());
success = true;
return context;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.AbstractQueryBuilder;
Expand Down Expand Up @@ -113,7 +114,8 @@ protected boolean resetNodeAfterTest() {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class, InternalOrPrivateSettingsPlugin.class);
return pluginList(FailOnRewriteQueryPlugin.class, CustomScriptPlugin.class, InternalOrPrivateSettingsPlugin.class,
MockSearchService.TestPlugin.class);
}

public static class CustomScriptPlugin extends MockScriptPlugin {
Expand Down Expand Up @@ -288,6 +290,7 @@ public void onFailure(Exception e) {
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
// have to free context since this test does not remove the index from IndicesService.
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
Expand Down Expand Up @@ -316,6 +319,59 @@ public void onFailure(Exception e) {
assertEquals(0, totalStats.getFetchCurrent());
}

public void testSearchWhileIndexDeletedDoesNotLeakSearchContext() throws ExecutionException, InterruptedException {
createIndex("index");
client().prepareIndex("index", "type", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();

IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index"));
IndexShard indexShard = indexService.getShard(0);

MockSearchService service = (MockSearchService) getInstanceFromNode(SearchService.class);
service.setOnPutContext(
context -> {
if (context.indexShard() == indexShard) {
assertAcked(client().admin().indices().prepareDelete("index"));
}
}
);

SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));

// the scrolls are not explicitly freed, but should all be gone when the test finished.
// for completeness, we also randomly test the regular search path.
final boolean useScroll = randomBoolean();
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
ShardSearchLocalRequest shardRequest;
if (useScroll) {
shardRequest = new ShardScrollRequestTest(indexShard.shardId());
} else {
shardRequest = new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
true, null, null);
}
service.executeQueryPhase(
shardRequest,
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);

try {
result.get();
} catch (Exception e) {
// ok
}

expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices("index").get());

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
createIndex("index");
final SearchService service = getInstanceFromNode(SearchService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;

public class MockSearchService extends SearchService {
/**
Expand All @@ -41,6 +42,8 @@ public static class TestPlugin extends Plugin {}

private static final Map<SearchContext, Throwable> ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>();

private Consumer<SearchContext> onPutContext = context -> {};

/** Throw an {@link AssertionError} if there are still in-flight contexts. */
public static void assertNoInFlightContext() {
final Map<SearchContext, Throwable> copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS);
Expand Down Expand Up @@ -74,8 +77,9 @@ public MockSearchService(ClusterService clusterService,

@Override
protected void putContext(SearchContext context) {
super.putContext(context);
onPutContext.accept(context);
addActiveContext(context);
super.putContext(context);
}

@Override
Expand All @@ -86,4 +90,8 @@ protected SearchContext removeContext(long id) {
}
return removed;
}

public void setOnPutContext(Consumer<SearchContext> onPutContext) {
this.onPutContext = onPutContext;
}
}

0 comments on commit 6bf110a

Please sign in to comment.