Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent search index delete 6 8 #42977

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,19 +594,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
}

SearchContext context = createContext(request);
onNewContext(context);
boolean success = false;
try {
putContext(context);
if (request.scroll() != null) {
success = true;
return context;
} finally {
if (success == false) {
freeContext(context.id());
}
}
}

private void onNewContext(SearchContext context) {
boolean success = false;
try {
if (context.scrollContext() != null) {
openScrollContexts.incrementAndGet();
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context.id());
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
if (success == false) {
try (SearchContext dummy = context) {
onFreeContext(context);
}
}
}
}
Expand Down Expand Up @@ -693,13 +709,8 @@ private void freeAllContextForIndex(Index index) {
public boolean freeContext(long id) {
final SearchContext context = removeContext(id);
if (context != null) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
try {
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
onFreeContext(context);
} finally {
context.close();
}
Expand All @@ -708,6 +719,16 @@ public boolean freeContext(long id) {
return false;
}

private void onFreeContext(SearchContext context) {
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
assert activeContexts.containsKey(context.id()) == false;
context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
openScrollContexts.decrementAndGet();
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
}

public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scrollContext() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.elasticsearch.search;

import com.carrotsearch.hppc.IntArrayList;

import org.apache.lucene.search.Query;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -229,6 +229,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
AtomicBoolean running = new AtomicBoolean(true);
CountDownLatch startGun = new CountDownLatch(1);
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);

final Thread thread = new Thread() {
@Override
public void run() {
Expand Down Expand Up @@ -267,10 +268,17 @@ public void onFailure(Exception e) {
try {
try {
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
service.executeQueryPhase(
new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
final boolean useScroll = randomBoolean();
ShardSearchLocalRequest shardRequest;
if (useScroll) {
shardRequest = new ShardScrollRequestTest(indexShard.shardId());
} else {
shardRequest = new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT,
new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f,
true, null, null),
true, null, null);
}
service.executeQueryPhase(
shardRequest,
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
SearchPhaseResult searchPhaseResult = result.get();
IntArrayList intCursors = new IntArrayList(1);
Expand All @@ -279,6 +287,9 @@ public void onFailure(Exception e) {
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
listener.get();
if (useScroll) {
service.freeContext(searchPhaseResult.getRequestId());
}
} catch (ExecutionException ex) {
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
throw ((RuntimeException)ex.getCause());
Expand All @@ -296,6 +307,13 @@ public void onFailure(Exception e) {
thread.join();
semaphore.acquire(Integer.MAX_VALUE);
}

assertEquals(0, service.getActiveContexts());

SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
assertEquals(0, totalStats.getQueryCurrent());
assertEquals(0, totalStats.getScrollCurrent());
assertEquals(0, totalStats.getFetchCurrent());
}

public void testTimeout() throws IOException {
Expand Down