diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 5f58dec3521dc..b08a127b4a806 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -594,19 +594,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc } SearchContext context = createContext(request); + onNewContext(context); boolean success = false; try { putContext(context); - if (request.scroll() != null) { + success = true; + return context; + } finally { + if (success == false) { + freeContext(context.id()); + } + } + } + + private void onNewContext(SearchContext context) { + boolean success = false; + try { + if (context.scrollContext() != null) { openScrollContexts.incrementAndGet(); context.indexShard().getSearchOperationListener().onNewScrollContext(context); } context.indexShard().getSearchOperationListener().onNewContext(context); success = true; - return context; } finally { - if (!success) { - freeContext(context.id()); + // currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the + // right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future. + if (success == false) { + try (SearchContext dummy = context) { + onFreeContext(context); + } } } } @@ -693,13 +709,8 @@ private void freeAllContextForIndex(Index index) { public boolean freeContext(long id) { final SearchContext context = removeContext(id); if (context != null) { - assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); try { - context.indexShard().getSearchOperationListener().onFreeContext(context); - if (context.scrollContext() != null) { - openScrollContexts.decrementAndGet(); - context.indexShard().getSearchOperationListener().onFreeScrollContext(context); - } + onFreeContext(context); } finally { context.close(); } @@ -708,6 +719,16 @@ public boolean freeContext(long id) { return false; } + private void onFreeContext(SearchContext context) { + assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount(); + assert activeContexts.containsKey(context.id()) == false; + context.indexShard().getSearchOperationListener().onFreeContext(context); + if (context.scrollContext() != null) { + openScrollContexts.decrementAndGet(); + context.indexShard().getSearchOperationListener().onFreeScrollContext(context); + } + } + public void freeAllScrollContexts() { for (SearchContext searchContext : activeContexts.values()) { if (searchContext.scrollContext() != null) { diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 6dd9550701fe3..b8052fc621868 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.search; import com.carrotsearch.hppc.IntArrayList; - import org.apache.lucene.search.Query; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; @@ -52,6 +51,7 @@ import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.TermQueryBuilder; +import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.index.shard.ShardId; @@ -229,6 +229,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException { AtomicBoolean running = new AtomicBoolean(true); CountDownLatch startGun = new CountDownLatch(1); Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + final Thread thread = new Thread() { @Override public void run() { @@ -267,10 +268,17 @@ public void onFailure(Exception e) { try { try { PlainActionFuture result = new PlainActionFuture<>(); - service.executeQueryPhase( - new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, + final boolean useScroll = randomBoolean(); + ShardSearchLocalRequest shardRequest; + if (useScroll) { + shardRequest = new ShardScrollRequestTest(indexShard.shardId()); + } else { + shardRequest = new ShardSearchLocalRequest(indexShard.shardId(), 1, SearchType.DEFAULT, new SearchSourceBuilder(), new String[0], false, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, - true, null, null), + true, null, null); + } + service.executeQueryPhase( + shardRequest, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); IntArrayList intCursors = new IntArrayList(1); @@ -279,6 +287,9 @@ public void onFailure(Exception e) { PlainActionFuture listener = new PlainActionFuture<>(); service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener); listener.get(); + if (useScroll) { + service.freeContext(searchPhaseResult.getRequestId()); + } } catch (ExecutionException ex) { assertThat(ex.getCause(), instanceOf(RuntimeException.class)); throw ((RuntimeException)ex.getCause()); @@ -296,6 +307,13 @@ public void onFailure(Exception e) { thread.join(); semaphore.acquire(Integer.MAX_VALUE); } + + assertEquals(0, service.getActiveContexts()); + + SearchStats.Stats totalStats = indexShard.searchStats().getTotal(); + assertEquals(0, totalStats.getQueryCurrent()); + assertEquals(0, totalStats.getScrollCurrent()); + assertEquals(0, totalStats.getFetchCurrent()); } public void testTimeout() throws IOException {