Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent search and index delete #42621

40 changes: 30 additions & 10 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -548,19 +548,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
}

SearchContext context = createContext(request);
onNewContext(context);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
success = true;
return context;
} finally {
if (success == false) {
freeContext(context.id());
}
}
}

private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context.id());
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
if (success == false) {
try (context) {
onFreeContext(context);
}
}
}
}
Expand Down Expand Up @@ -648,18 +664,22 @@ private void freeAllContextForIndex(Index index) {
public boolean freeContext(long id) {
try (SearchContext context = removeContext(id)) {
if (context != null) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
onFreeContext(context);
return true;
}
return false;
}
}

private void onFreeContext(SearchContext context) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add an assertion here that ensures that we removed the context. I think it's trappy that we have an this method and we need to make sure that we never call if if we haven't removed the context. The special case is fina

assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
}

public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scrollContext() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search;

import com.carrotsearch.hppc.IntArrayList;

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -51,6 +50,7 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -226,6 +226,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

final Thread thread = new Thread() {
@Override
public void run() {
Expand Down Expand Up @@ -261,12 +262,16 @@ public void onFailure(Exception e) {
try {
final int rounds = scaledRandomIntBetween(100, 10000);
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));
for (int i = 0; i < rounds; i++) {
try {
try {
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
final boolean useScroll = randomBoolean();
service.executeQueryPhase(
new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1,
new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest,
indexShard.shardId(), 1,
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
SearchPhaseResult searchPhaseResult = result.get();
Expand All @@ -276,6 +281,9 @@ public void onFailure(Exception e) {
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
throw ((RuntimeException)ex.getCause());
Expand All @@ -293,6 +301,13 @@ public void onFailure(Exception e) {
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
Expand Down