From b73af3a79bfe62c7a1a2c1d644ade1df74ce3c84 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Sep 2020 09:46:30 +0200 Subject: [PATCH 1/9] Ensure validation of the reader context is executed first This change makes sure that we validate the reader context (`SearchOperationListener#validateReaderContext) before any other operation. Relates #61446 --- .../index/shard/SearchOperationListener.java | 6 ++-- .../elasticsearch/search/SearchService.java | 31 +++++++------------ .../search/internal/ReaderContext.java | 5 +++ .../shard/SearchOperationListenerTests.java | 6 ++-- .../SecuritySearchOperationListener.java | 2 +- .../SecuritySearchOperationListenerTests.java | 12 +++---- 6 files changed, 30 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index c0d98b434a300..29b5e11040f34 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -113,7 +113,7 @@ default void onFreeScrollContext(ReaderContext readerContext) {} * @param readerContext The reader context used by this request. * @param transportRequest the request that is going to use the search context */ - default void validateSearchContext(ReaderContext readerContext, TransportRequest transportRequest) {} + default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {} /** * A Composite listener that multiplexes calls to each of the listeners methods. @@ -238,11 +238,11 @@ public void onFreeScrollContext(ReaderContext readerContext) { } @Override - public void validateSearchContext(ReaderContext readerContext, TransportRequest request) { + public void validateReaderContext(ReaderContext readerContext, TransportRequest request) { Exception exception = null; for (SearchOperationListener listener : listeners) { try { - listener.validateSearchContext(readerContext, request); + listener.validateReaderContext(readerContext, request); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 53e06613c444e..ed13e550351f1 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -118,6 +118,7 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.TransportRequest; import java.io.IOException; import java.util.Collections; @@ -442,7 +443,7 @@ public void onFailure(Exception exc) { private IndexShard getShard(ShardSearchRequest request) { if (request.readerId() != null) { - return findReaderContext(request.readerId()).indexShard(); + return findReaderContext(request.readerId(), request).indexShard(); } else { return indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); } @@ -501,13 +502,12 @@ private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchCon public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { - final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (Releasable ignored = readerContext.markAsUsed(); SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); if (request.scroll() != null && request.scroll().keepAlive() != null) { final long keepAlive = request.scroll().keepAlive().millis(); checkKeepAliveLimit(keepAlive); @@ -528,14 +528,13 @@ public void executeQueryPhase(InternalScrollSearchRequest request, } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId()); + final ReaderContext readerContext = findReaderContext(request.contextId(), request); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); readerContext.setAggregatedDfs(request.dfs()); try (Releasable ignored = readerContext.markAsUsed(); SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { @@ -573,13 +572,12 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { - final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try (Releasable ignored = readerContext.markAsUsed(); SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); if (request.scroll() != null && request.scroll().keepAlive() != null) { checkKeepAliveLimit(request.scroll().keepAlive().millis()); readerContext.keepAlive(request.scroll().keepAlive().millis()); @@ -601,12 +599,11 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { - final ReaderContext readerContext = findReaderContext(request.contextId()); + final ReaderContext readerContext = findReaderContext(request.contextId(), request); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); try (Releasable ignored = readerContext.markAsUsed(); SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -642,19 +639,19 @@ private ReaderContext getReaderContext(ShardSearchContextId id) { return null; } - private ReaderContext findReaderContext(ShardSearchContextId id) throws SearchContextMissingException { + private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException { final ReaderContext reader = getReaderContext(id); if (reader == null) { throw new SearchContextMissingException(id); } + reader.validate(request); return reader; } final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean keepStatesInContext) { if (request.readerId() != null) { assert keepStatesInContext == false; - final ReaderContext readerContext = findReaderContext(request.readerId()); - readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, request); + final ReaderContext readerContext = findReaderContext(request.readerId(), request); final long keepAlive = request.keepAlive().millis(); checkKeepAliveLimit(keepAlive); readerContext.keepAlive(keepAlive); @@ -828,12 +825,9 @@ private void freeAllContextForIndex(Index index) { } public boolean freeReaderContext(ShardSearchContextId contextId) { - if (getReaderContext(contextId) != null) { - try (ReaderContext context = removeReaderContext(contextId.getId())) { - return context != null; - } + try (ReaderContext context = removeReaderContext(contextId.getId())) { + return context != null; } - return false; } public void freeAllScrollContexts() { @@ -1146,13 +1140,12 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set compositeListener.validateSearchContext(mock(ReaderContext.class), Empty.INSTANCE)); + () -> compositeListener.validateReaderContext(mock(ReaderContext.class), Empty.INSTANCE)); assertNull(expected.getMessage()); assertEquals(throwingListeners - 1, expected.getSuppressed().length); if (throwingListeners > 1) { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index f4520c9e24724..1f126f1cac51c 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -69,7 +69,7 @@ public void onNewScrollContext(ReaderContext readerContext) { * object from the scroll context with the current authentication context */ @Override - public void validateSearchContext(ReaderContext readerContext, TransportRequest request) { + public void validateReaderContext(ReaderContext readerContext, TransportRequest request) { if (licenseState.isSecurityEnabled()) { if (readerContext.scrollContext() != null) { final Authentication originalAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 729c969f9c0ab..ec680509b18d7 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -79,7 +79,7 @@ public void testUnlicensed() { SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); listener.onNewScrollContext(readerContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrailService, searchContext); } @@ -136,7 +136,7 @@ public void testValidateSearchContext() throws Exception { try (StoredContext ignore = threadContext.newStoredContext(false)) { Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState).isSecurityEnabled(); verifyZeroInteractions(auditTrail); @@ -148,7 +148,7 @@ public void testValidateSearchContext() throws Exception { Authentication authentication = new Authentication(new User("test", "role"), new RealmRef(realmName, "file", nodeName), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(readerContext, Empty.INSTANCE); + listener.validateReaderContext(readerContext, Empty.INSTANCE); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrail); @@ -166,7 +166,7 @@ public void testValidateSearchContext() throws Exception { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, - () -> listener.validateSearchContext(readerContext, request)); + () -> listener.validateReaderContext(readerContext, request)); assertEquals(readerContext.id(), expected.contextId()); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), nullValue()); verify(licenseState, Mockito.atLeast(3)).isSecurityEnabled(); @@ -185,7 +185,7 @@ public void testValidateSearchContext() throws Exception { authentication.writeToContext(threadContext); threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - listener.validateSearchContext(readerContext, request); + listener.validateReaderContext(readerContext, request); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), is(indicesAccessControl)); verify(licenseState, Mockito.atLeast(4)).isSecurityEnabled(); verifyNoMoreInteractions(auditTrail); @@ -204,7 +204,7 @@ public void testValidateSearchContext() throws Exception { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, - () -> listener.validateSearchContext(readerContext, request)); + () -> listener.validateReaderContext(readerContext, request)); assertEquals(readerContext.id(), expected.contextId()); assertThat(threadContext.getTransient(AuthorizationServiceField.INDICES_PERMISSIONS_KEY), nullValue()); verify(licenseState, Mockito.atLeast(5)).isSecurityEnabled(); From 159b3a5e0f87ab95ecf4f7f2941f44554ccec307 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Sep 2020 15:04:51 +0200 Subject: [PATCH 2/9] ensure that reader context are released properly on failures --- .../elasticsearch/search/SearchService.java | 151 ++++++++++-------- 1 file changed, 84 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ed13e550351f1..ec7455fd20709 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -354,7 +354,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, return context.dfsResult(); } catch (Exception e) { logger.trace("Dfs phase failed", e); - processFailure(request, readerContext, e); + processFailure(readerContext, e); throw e; } } @@ -397,12 +397,12 @@ public void onResponse(ShardSearchRequest orig) { try (markAsUsed) { listener.onFailure(exc); } finally { - processFailure(request, readerContext, exc); + processFailure(readerContext, exc); } return; } if (canRewriteToMatchNone(canMatchRequest.source()) - && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { + && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { try (markAsUsed) { if (orig.readerId() == null) { try { @@ -421,17 +421,8 @@ public void onResponse(ShardSearchRequest orig) { } // fork the execution in the search thread pool - runAsync(getExecutor(shard), () -> { - try (markAsUsed) { - return executeQueryPhase(orig, task, readerContext); - } - }, ActionListener.wrap(listener::onResponse, exc -> { - try (markAsUsed) { - listener.onFailure(exc); - } finally { - processFailure(request, readerContext, exc); - } - })); + runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, readerContext), + wrapFailureListener(listener, readerContext, markAsUsed)); } @Override @@ -482,7 +473,7 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } logger.trace("Query phase failed", e); - processFailure(request, readerContext, e); + processFailure(readerContext, e); throw e; } } @@ -503,10 +494,10 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { final long keepAlive = request.scroll().keepAlive().millis(); @@ -521,19 +512,19 @@ public void executeQueryPhase(InternalScrollSearchRequest request, return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.shardSearchRequest()); readerContext.setAggregatedDfs(request.dfs()); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, true); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { searchContext.searcher().setAggregatedDfs(request.dfs()); queryPhase.execute(searchContext); @@ -551,10 +542,10 @@ public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Query phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } private Executor getExecutor(IndexShard indexShard) { @@ -573,10 +564,10 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { checkKeepAliveLimit(request.scroll().keepAlive().millis()); @@ -592,18 +583,18 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { final ReaderContext readerContext = findReaderContext(request.contextId(), request); + final Releasable markAsUsed = readerContext.markAsUsed(); runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); - try (Releasable ignored = readerContext.markAsUsed(); - SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { if (request.lastEmittedDoc() != null) { searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } @@ -622,10 +613,10 @@ public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, A } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); - processFailure(shardSearchRequest, readerContext, e); + processFailure(readerContext, e); throw e; } - }, listener); + }, wrapFailureListener(listener, readerContext, markAsUsed)); } private ReaderContext getReaderContext(ShardSearchContextId id) { @@ -644,7 +635,12 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (reader == null) { throw new SearchContextMissingException(id); } - reader.validate(request); + try { + reader.validate(request); + } catch (Exception exc) { + processFailure(reader, exc); + throw exc; + } return reader; } @@ -825,9 +821,12 @@ private void freeAllContextForIndex(Index index) { } public boolean freeReaderContext(ShardSearchContextId contextId) { - try (ReaderContext context = removeReaderContext(contextId.getId())) { - return context != null; + if (getReaderContext(contextId) != null) { + try (ReaderContext context = removeReaderContext(contextId.getId())) { + return context != null; + } } + return false; } public void freeAllScrollContexts() { @@ -855,8 +854,25 @@ private void checkKeepAliveLimit(long keepAlive) { } } - private void processFailure(ShardSearchRequest request, ReaderContext context, Exception e) { - if (context.singleSession() || request.scroll() != null) { + private ActionListener wrapFailureListener(ActionListener listener, ReaderContext context, Releasable releasable) { + return new ActionListener<>() { + @Override + public void onResponse(T resp) { + releasable.close(); + listener.onResponse(resp); + } + + @Override + public void onFailure(Exception exc) { + processFailure(context, exc); + releasable.close(); + listener.onFailure(exc); + } + }; + } + + private void processFailure(ReaderContext context, Exception e) { + if (context.singleSession()) { // we release the reader on failure if the request is a normal search or a scroll freeReaderContext(context.id()); } @@ -1142,40 +1158,41 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType(); final ReaderContext readerContext = request.readerId() != null ? findReaderContext(request.readerId(), request) : null; final Releasable markAsUsed = readerContext != null ? readerContext.markAsUsed() : null; - final IndexService indexService; - final Engine.Searcher canMatchSearcher; - final boolean hasRefreshPending; - if (readerContext != null) { - checkKeepAliveLimit(request.keepAlive().millis()); - readerContext.keepAlive(request.keepAlive().millis()); - indexService = readerContext.indexService(); - canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); - hasRefreshPending = false; - } else { - indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); - IndexShard indexShard = indexService.getShard(request.shardId().getId()); - hasRefreshPending = indexShard.hasRefreshPending(); - canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); - } - - try (markAsUsed; canMatchSearcher) { - QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, - request::nowInMillis, request.getClusterAlias()); - Rewriteable.rewrite(request.getRewriteable(), context, false); - final boolean aliasFilterCanMatch = request.getAliasFilter() - .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; - FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); - MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; - final boolean canMatch; - if (canRewriteToMatchNone(request.source())) { - QueryBuilder queryBuilder = request.source().query(); - canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + try (markAsUsed) { + final IndexService indexService; + final Engine.Searcher canMatchSearcher; + final boolean hasRefreshPending; + if (readerContext != null) { + checkKeepAliveLimit(request.keepAlive().millis()); + readerContext.keepAlive(request.keepAlive().millis()); + indexService = readerContext.indexService(); + canMatchSearcher = readerContext.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); + hasRefreshPending = false; } else { - // null query means match_all - canMatch = aliasFilterCanMatch; + indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); + IndexShard indexShard = indexService.getShard(request.shardId().getId()); + hasRefreshPending = indexShard.hasRefreshPending(); + canMatchSearcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE); } - return new CanMatchResponse(canMatch || hasRefreshPending, minMax); + try (canMatchSearcher) { + QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), canMatchSearcher, + request::nowInMillis, request.getClusterAlias()); + Rewriteable.rewrite(request.getRewriteable(), context, false); + final boolean aliasFilterCanMatch = request.getAliasFilter() + .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; + FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source()); + MinAndMax minMax = sortBuilder != null ? FieldSortBuilder.getMinMaxOrNull(context, sortBuilder) : null; + final boolean canMatch; + if (canRewriteToMatchNone(request.source())) { + QueryBuilder queryBuilder = request.source().query(); + canMatch = aliasFilterCanMatch && queryBuilder instanceof MatchNoneQueryBuilder == false; + } else { + // null query means match_all + canMatch = aliasFilterCanMatch; + } + return new CanMatchResponse(canMatch || hasRefreshPending, minMax); + } } } From d5a6162005e81e592ff181499cb8ca14b7eb8fdf Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Sep 2020 21:15:48 +0200 Subject: [PATCH 3/9] restore the removal of scroll context on failure --- .../main/java/org/elasticsearch/search/SearchService.java | 6 +++++- .../elasticsearch/search/internal/LegacyReaderContext.java | 6 ++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index ec7455fd20709..9335983fb4429 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -871,8 +871,12 @@ public void onFailure(Exception exc) { }; } + private boolean isScrollContext(ReaderContext context) { + return context instanceof LegacyReaderContext && context.singleSession() == false; + } + private void processFailure(ReaderContext context, Exception e) { - if (context.singleSession()) { + if (context.singleSession() || isScrollContext(context)) { // we release the reader on failure if the request is a normal search or a scroll freeReaderContext(context.id()); } diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index 1c3c14ab14d38..255ae9bc10f45 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -59,10 +59,8 @@ public Engine.Searcher acquireSearcher(String source) { // This ensures that we wrap the searcher's reader with the user's permissions // when they are available. if (searcher == null) { - Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); - onClose = delegate::close; - searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), - delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); + searcher = searcherSupplier.acquireSearcher(source); + onClose = searcher::close; } return searcher; } From a49d06734e5f4614a5a394176b1033eefcc9fcc1 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Sep 2020 22:12:02 +0200 Subject: [PATCH 4/9] cleanup --- .../elasticsearch/search/internal/LegacyReaderContext.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index 255ae9bc10f45..98a722a6abac5 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -59,8 +59,11 @@ public Engine.Searcher acquireSearcher(String source) { // This ensures that we wrap the searcher's reader with the user's permissions // when they are available. if (searcher == null) { - searcher = searcherSupplier.acquireSearcher(source); - onClose = searcher::close; + Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); + onClose = delegate::close; + // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed + searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), + delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); } return searcher; } From 00d2b1db8abd778eeef59c48eb2dc0793bd04062 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Sep 2020 23:19:52 +0200 Subject: [PATCH 5/9] don't release PIT on failure --- .../search/AbstractSearchAsyncAction.java | 29 ++++++++++--------- .../action/search/DfsQueryPhase.java | 12 ++++---- .../action/search/FetchSearchPhase.java | 10 +++---- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 9a37dfc932502..31c805e0b222f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -546,24 +546,27 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) } /** - * This method should be called if a search phase failed to ensure all relevant search contexts and resources are released. - * this method will also notify the listener and sends back a failure to the user. + * This method should be called if a search phase failed to ensure all relevant reader contexts are released. + * This method will also notify the listener and sends back a failure to the user. * * @param exception the exception explaining or causing the phase failure */ private void raisePhaseFailure(SearchPhaseExecutionException exception) { - results.getSuccessfulResults().forEach((entry) -> { - if (entry.getContextId() != null) { - try { - SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); - Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); - sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.trace("failed to release context", inner); + // we don't release persistent readers (point in time). + if (request.pointInTimeBuilder() == null) { + results.getSuccessfulResults().forEach((entry) -> { + if (entry.getContextId() != null) { + try { + SearchShardTarget searchShardTarget = entry.getSearchShardTarget(); + Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); + sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices()); + } catch (Exception inner) { + inner.addSuppressed(exception); + logger.trace("failed to release context", inner); + } } - } - }); + }); + } listener.onFailure(exception); } diff --git a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java index 37f5dc0c23f9e..8c31bf060221f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java @@ -98,11 +98,13 @@ public void onFailure(Exception exception) { progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception); counter.onFailure(shardIndex, searchShardTarget, exception); } finally { - // the query might not have been executed at all (for example because thread pool rejected - // execution) and the search context that was created in dfs phase might not be released. - // release it again to be in the safe side - context.sendReleaseSearchContext( - querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices()); + if (context.getRequest().pointInTimeBuilder() == null) { + // the query might not have been executed at all (for example because thread pool rejected + // execution) and the search context that was created in dfs phase might not be released. + // release it again to be in the safe side + context.sendReleaseSearchContext( + querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices()); + } } } }); diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 613761871f4a1..55d40a023d592 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -206,11 +206,11 @@ public void onFailure(Exception e) { * Releases shard targets that are not used in the docsIdsToLoad. */ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) { - // we only release search context that we did not fetch from if we are not scrolling - // and if it has at lease one hit that didn't make it to the global topDocs - if (context.getRequest().scroll() == null && - context.getRequest().pointInTimeBuilder() == null && - queryResult.hasSearchContext()) { + // we only release search context that we did not fetch from, if we are not scrolling + // or using a PIT and if it has at least one hit that didn't make it to the global topDocs + if (queryResult.hasSearchContext() + && context.getRequest().scroll() == null + && context.getRequest().pointInTimeBuilder() == null) { try { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); From f89a0a69f317feabf1745d3b54ea62f9c108bb5b Mon Sep 17 00:00:00 2001 From: jimczi Date: Thu, 3 Sep 2020 00:22:25 +0200 Subject: [PATCH 6/9] do not remove scroll context on search rejection --- .../java/org/elasticsearch/search/SearchService.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9335983fb4429..118a9c2763f2e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -515,7 +515,7 @@ public void executeQueryPhase(InternalScrollSearchRequest request, processFailure(readerContext, e); throw e; } - }, wrapFailureListener(listener, readerContext, markAsUsed)); + }, ActionListener.runAfter(listener, markAsUsed::close)); } public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { @@ -586,7 +586,7 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa processFailure(readerContext, e); throw e; } - }, wrapFailureListener(listener, readerContext, markAsUsed)); + }, ActionListener.runAfter(listener, markAsUsed::close)); } public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { @@ -875,17 +875,17 @@ private boolean isScrollContext(ReaderContext context) { return context instanceof LegacyReaderContext && context.singleSession() == false; } - private void processFailure(ReaderContext context, Exception e) { + private void processFailure(ReaderContext context, Exception exc) { if (context.singleSession() || isScrollContext(context)) { // we release the reader on failure if the request is a normal search or a scroll freeReaderContext(context.id()); } try { - if (Lucene.isCorruptionException(e)) { - context.indexShard().failShard("search execution corruption failure", e); + if (Lucene.isCorruptionException(exc)) { + context.indexShard().failShard("search execution corruption failure", exc); } } catch (Exception inner) { - inner.addSuppressed(e); + inner.addSuppressed(exc); logger.warn("failed to process shard failure to (potentially) send back shard failure on corruption", inner); } } From 0c42f9c0b1740cb75d511118dca90459d9bdc38c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 6 Sep 2020 19:40:42 -0400 Subject: [PATCH 7/9] Hold searcher in LegacyReaderContext --- .../search/internal/LegacyReaderContext.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index 98a722a6abac5..8f6ccb526cf34 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -35,8 +35,7 @@ public class LegacyReaderContext extends ReaderContext { private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; - private Engine.Searcher searcher; - private Releasable onClose; + private volatile Engine.Searcher searcher; public LegacyReaderContext(long id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis) { @@ -59,8 +58,8 @@ public Engine.Searcher acquireSearcher(String source) { // This ensures that we wrap the searcher's reader with the user's permissions // when they are available. if (searcher == null) { - Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); - onClose = delegate::close; + final Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); + addOnClose(delegate); // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); @@ -70,12 +69,6 @@ public Engine.Searcher acquireSearcher(String source) { return super.acquireSearcher(source); } - - @Override - void doClose() { - Releasables.close(onClose, super::doClose); - } - @Override public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) { return shardSearchRequest; From a76a767a8341d91162505ad75131a4b6a769ae27 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sun, 6 Sep 2020 19:41:37 -0400 Subject: [PATCH 8/9] stylecheck --- .../org/elasticsearch/search/internal/LegacyReaderContext.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index 8f6ccb526cf34..ab188f8ddf23f 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -19,8 +19,6 @@ package org.elasticsearch.search.internal; -import org.elasticsearch.common.lease.Releasable; -import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; From 16ea240ad54ca1a76da955b934cc91066bdaee11 Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 7 Sep 2020 09:26:13 +0200 Subject: [PATCH 9/9] apply review feedback --- .../src/main/java/org/elasticsearch/search/SearchService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 118a9c2763f2e..de303b514465d 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -858,14 +858,14 @@ private ActionListener wrapFailureListener(ActionListener listener, Re return new ActionListener<>() { @Override public void onResponse(T resp) { - releasable.close(); + Releasables.close(releasable); listener.onResponse(resp); } @Override public void onFailure(Exception exc) { processFailure(context, exc); - releasable.close(); + Releasables.close(releasable); listener.onFailure(exc); } };