diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 90b290038e18f..f795a8cf69b17 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -592,14 +592,14 @@ protected final GetResult getFromSearcher(Get get, BiFunction wrapper) throws EngineException { - return acquireReader(wrapper, SearcherScope.EXTERNAL); + public final SearcherSupplier acquireSearcherSupplier(Function wrapper) throws EngineException { + return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL); } /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public Reader acquireReader(Function wrapper, SearcherScope scope) throws EngineException { + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before * the searcher is acquired. */ @@ -611,7 +611,7 @@ public Reader acquireReader(Function wrapper, SearcherScope ReferenceManager referenceManager = getReferenceManager(scope); ElasticsearchDirectoryReader acquire = referenceManager.acquire(); AtomicBoolean released = new AtomicBoolean(false); - Reader reader = new Reader(wrapper) { + SearcherSupplier reader = new SearcherSupplier(wrapper) { @Override public Searcher acquireSearcherInternal(String source) { assert assertSearcherIsWarmedUp(source, scope); @@ -659,9 +659,9 @@ public final Searcher acquireSearcher(String source) throws EngineException { } public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { - Reader releasable = null; + SearcherSupplier releasable = null; try { - Reader reader = releasable = acquireReader(Function.identity(), scope); + SearcherSupplier reader = releasable = acquireSearcherSupplier(Function.identity(), scope); Searcher searcher = reader.acquireSearcher(source); releasable = null; return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(), @@ -1174,10 +1174,10 @@ default void onFailedEngine(String reason, @Nullable Exception e) { } } - public abstract static class Reader implements Releasable { + public abstract static class SearcherSupplier implements Releasable { private final Function wrapper; - public Reader(Function wrapper) { + public SearcherSupplier(Function wrapper) { this.wrapper = wrapper; } diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index 664b631249f68..e72a5182287e1 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import java.util.HashMap; @@ -158,15 +157,15 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.inc(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.dec(); assert totalStats.scrollCurrent.count() >= 0; - totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano())); + totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); } static final class StatsHolder { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 44c8f2fd26293..b6b9884a15096 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1199,18 +1199,18 @@ public void failShard(String reason, @Nullable Exception e) { /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public Engine.Reader acquireReader() { - return acquireReader(Engine.SearcherScope.EXTERNAL); + public Engine.SearcherSupplier acquireSearcherSupplier() { + return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL); } /** * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public Engine.Reader acquireReader(Engine.SearcherScope scope) { + public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) { readAllowed(); markSearcherAccessed(); final Engine engine = getEngine(); - return engine.acquireReader(this::wrapSearcher, scope); + return engine.acquireSearcherSupplier(this::wrapSearcher, scope); } public Engine.Searcher acquireSearcher(String source) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index 77e83edcdd18f..fff7e88e505cb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -93,28 +92,29 @@ default void onNewReaderContext(ReaderContext readerContext) {} default void onFreeReaderContext(ReaderContext readerContext) {} /** - * Executed when a new scroll search {@link SearchContext} was created - * @param scrollContext the created search context + * Executed when a new scroll search {@link ReaderContext} was created + * @param readerContext the created reader context */ - default void onNewScrollContext(ScrollContext scrollContext) {} + default void onNewScrollContext(ReaderContext readerContext) {} /** * Executed when a scroll search {@link SearchContext} is freed. * This happens either when the scroll search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param scrollContext the freed search context + * @param readerContext the freed search context */ - default void onFreeScrollContext(ScrollContext scrollContext) {} + default void onFreeScrollContext(ReaderContext readerContext) {} /** * Executed prior to using a {@link SearchContext} that has been retrieved * from the active contexts. If the context is deemed invalid a runtime * exception can be thrown, which will prevent the context from being used. - * @param context the context retrieved from the active contexts + * @param readerContext The reader context used by this request. + * @param searchContext The newly created {@link SearchContext}. * @param transportRequest the request that is going to use the search context */ - default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {} + default void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest transportRequest) {} /** * Executed when a search context was freed. The implementor can implement @@ -225,10 +225,10 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e); } @@ -236,10 +236,10 @@ public void onNewScrollContext(ScrollContext scrollContext) { } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeScrollContext(scrollContext); + listener.onFreeScrollContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e); } @@ -247,11 +247,11 @@ public void onFreeScrollContext(ScrollContext scrollContext) { } @Override - public void validateSearchContext(SearchContext context, TransportRequest request) { + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { Exception exception = null; for (SearchOperationListener listener : listeners) { try { - listener.validateSearchContext(context, request); + listener.validateSearchContext(readerContext, searchContext, request); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0129a9aec88f8..6fe286d915be5 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -483,26 +483,26 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.contextId()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + try (SearchContext searchContext = createContext(readerContext, readerContext.getShardSearchRequest(null), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { final long keepAlive = request.scroll().keepAlive().millis(); checkKeepAliveLimit(keepAlive); - reader.keepAlive(keepAlive); + readerContext.keepAlive(keepAlive); } - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); - processScroll(request, reader, context); - queryPhase.execute(context); + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, readerContext, searchContext); + queryPhase.execute(searchContext); executor.success(); - final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); - context.queryResult().setRescoreDocIds(rescoreDocIds); - reader.setRescoreDocIds(rescoreDocIds); - return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); + final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); + searchContext.queryResult().setRescoreDocIds(rescoreDocIds); + readerContext.setRescoreDocIds(rescoreDocIds); + return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -510,25 +510,25 @@ public void executeQueryPhase(InternalScrollSearchRequest request, public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final ReaderContext reader = findReaderContext(request.contextId()); - reader.setAggregatedDfs(request.dfs()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(request.shardSearchRequest()), task, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - context.searcher().setAggregatedDfs(request.dfs()); - queryPhase.execute(context); - if (context.queryResult().hasSearchContext() == false && reader.singleSession()) { + final ReaderContext readerContext = findReaderContext(request.contextId()); + readerContext.setAggregatedDfs(request.dfs()); + try (SearchContext searchContext = createContext(readerContext, readerContext.getShardSearchRequest(request.shardSearchRequest()), task, true); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + searchContext.searcher().setAggregatedDfs(request.dfs()); + queryPhase.execute(searchContext); + if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { // no hits, we can release the context since there will be no fetch phase - freeReaderContext(reader.id()); + freeReaderContext(readerContext.id()); } executor.success(); - final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); - context.queryResult().setRescoreDocIds(rescoreDocIds); - reader.setRescoreDocIds(rescoreDocIds); - return context.queryResult(); + final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); + searchContext.queryResult().setRescoreDocIds(rescoreDocIds); + readerContext.setRescoreDocIds(rescoreDocIds); + return searchContext.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -546,24 +546,24 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.contextId()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + try (SearchContext searchContext = createContext(readerContext, readerContext.getShardSearchRequest(null), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { checkKeepAliveLimit(request.scroll().keepAlive().millis()); - reader.keepAlive(request.scroll().keepAlive().millis()); + readerContext.keepAlive(request.scroll().keepAlive().millis()); } - context.assignRescoreDocIds(reader.getRescoreDocIds(null)); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - processScroll(request, reader, context); - queryPhase.execute(context); + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + processScroll(request, readerContext, searchContext); + queryPhase.execute(searchContext); final long afterQueryTime = executor.success(); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(reader, context, afterQueryTime); - return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); + return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -571,27 +571,27 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final ReaderContext reader = findReaderContext(request.contextId()); - final ShardSearchRequest shardSearchRequest = reader.getShardSearchRequest(request.getShardSearchRequest()); - try (SearchContext context = createContext(reader, shardSearchRequest, task, false)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + final ReaderContext readerContext = findReaderContext(request.contextId()); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); if (request.lastEmittedDoc() != null) { - context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); + searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } - context.assignRescoreDocIds(reader.getRescoreDocIds(request.getRescoreDocIds())); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(request.getAggregatedDfs())); - context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { - fetchPhase.execute(context); - if (reader.singleSession()) { + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds())); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); + searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext, true, System.nanoTime())) { + fetchPhase.execute(searchContext); + if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } executor.success(); } - return context.fetchResult(); + return searchContext.fetchResult(); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -628,12 +628,12 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean } IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard shard = indexService.getShard(request.shardId().id()); - Engine.Reader reader = shard.acquireReader(); + Engine.SearcherSupplier reader = shard.acquireSearcherSupplier(); return createAndPutReaderContext(request, shard, reader, keepStatesInContext); } final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexShard shard, - Engine.Reader reader, boolean keepStatesInContext) { + Engine.SearcherSupplier reader, boolean keepStatesInContext) { assert request.readerId() == null; assert request.keepAlive() == null; ReaderContext readerContext = null; @@ -665,12 +665,12 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); searchOperationListener.onNewReaderContext(finalReaderContext); if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onNewScrollContext(finalReaderContext.scrollContext()); + searchOperationListener.onNewScrollContext(finalReaderContext); } readerContext.addOnClose(() -> { try { if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onFreeScrollContext(finalReaderContext.scrollContext()); + searchOperationListener.onFreeScrollContext(finalReaderContext); } } finally { searchOperationListener.onFreeReaderContext(finalReaderContext); @@ -697,7 +697,7 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen runAsync(shard, () -> { Releasable releasable = null; try { - final Engine.Reader reader = shard.acquireReader(); + final Engine.SearcherSupplier reader = shard.acquireSearcherSupplier(); releasable = reader; final ReaderContext readerContext = new ReaderContext( idGenerator.incrementAndGet(), shard, reader, keepAlive.millis(), false); @@ -751,7 +751,7 @@ final SearchContext createContext(ReaderContext reader, public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); - Engine.Reader reader = indexShard.acquireReader(); + Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, reader, -1L, true)) { reader = null; // transfer ownership to readerContext DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout); diff --git a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java index 155de2c277995..477920a928554 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java +++ b/server/src/main/java/org/elasticsearch/search/SearchShardTarget.java @@ -19,6 +19,7 @@ package org.elasticsearch.search; +import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -38,9 +39,7 @@ public final class SearchShardTarget implements Writeable, Comparable onCloses = new CopyOnWriteArrayList<>(); - public ReaderContext(long id, IndexShard indexShard, Engine.Reader reader, long keepAliveInMillis, boolean singleSession) { + private final long startTimeInNano = System.nanoTime(); + + private Map context; + + public ReaderContext(long id, IndexShard indexShard, Engine.SearcherSupplier reader, long keepAliveInMillis, boolean singleSession) { super("reader_context"); this.id = new SearchContextId(UUIDs.base64UUID(), id); this.indexShard = indexShard; @@ -158,4 +164,27 @@ public void setRescoreDocIds(RescoreDocIds rescoreDocIds) { public boolean singleSession() { return singleSession; } + + /** + * Returns the object or null if the given key does not have a + * value in the context + */ + @SuppressWarnings("unchecked") // (T)object + public T getFromContext(String key) { + return context != null ? (T) context.get(key) : null; + } + + /** + * Puts the object into the context + */ + public void putInContext(String key, Object value) { + if (context == null) { + context = new HashMap<>(); + } + context.put(key, value); + } + + public long getStartTimeInNano() { + return startTimeInNano; + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java index 440408646d127..5b9c632d4e522 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java @@ -23,42 +23,10 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.search.Scroll; -import java.util.HashMap; -import java.util.Map; - /** Wrapper around information that needs to stay around when scrolling. */ public final class ScrollContext { - - private Map context = null; - public TotalHits totalHits = null; public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; - - private final long startTimeInNano = System.nanoTime(); - - /** - * Returns the object or null if the given key does not have a - * value in the context - */ - @SuppressWarnings("unchecked") // (T)object - public T getFromContext(String key) { - return context != null ? (T) context.get(key) : null; - } - - /** - * Puts the object into the context - */ - public void putInContext(String key, Object value) { - if (context == null) { - context = new HashMap<>(); - } - context.put(key, value); - } - - - public long getStartTimeInNano() { - return startTimeInNano; - } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java index 34c528dfeda6a..5aca710963c69 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java @@ -105,20 +105,21 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onNewScrollContext(ReaderContext readerContext) { + assertNotNull(readerContext); newScrollContext.incrementAndGet(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onFreeScrollContext(ReaderContext readerContext) { + assertNotNull(readerContext); freeScrollContext.incrementAndGet(); } @Override - public void validateSearchContext(SearchContext context, TransportRequest request) { - assertNotNull(context); + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { + assertNotNull(readerContext); + assertNotNull(searchContext); validateSearchContext.incrementAndGet(); } }; @@ -232,7 +233,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewScrollContext(new ScrollContext()); + compositeListener.onNewScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -258,7 +259,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeScrollContext(new ScrollContext()); + compositeListener.onFreeScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -272,10 +273,10 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, validateSearchContext.get()); if (throwingListeners == 0) { - compositeListener.validateSearchContext(ctx, Empty.INSTANCE); + compositeListener.validateSearchContext(mock(ReaderContext.class), ctx, Empty.INSTANCE); } else { RuntimeException expected = - expectThrows(RuntimeException.class, () -> compositeListener.validateSearchContext(ctx, Empty.INSTANCE)); + expectThrows(RuntimeException.class, () -> compositeListener.validateSearchContext(mock(ReaderContext.class), ctx, Empty.INSTANCE)); assertNull(expected.getMessage()); assertEquals(throwingListeners - 1, expected.getSuppressed().length); if (throwingListeners > 1) { diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 04d9ae7e657d7..de96b084f3e6c 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -119,7 +119,7 @@ public void testPreProcess() throws Exception { try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { - final Engine.Reader engineReader = new Engine.Reader(Function.identity()) { + final Engine.SearcherSupplier engineReader = new Engine.SearcherSupplier(Function.identity()) { @Override public void close() {} diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index cbbc876e7d953..0f9a7f7b55b4b 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -513,7 +513,7 @@ public void testMaxOpenScrollContexts() throws Exception { final ShardScrollRequestTest request = new ShardScrollRequestTest(indexShard.shardId()); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutReaderContext(request, indexShard, indexShard.acquireReader(), randomBoolean())); + () -> service.createAndPutReaderContext(request, indexShard, indexShard.acquireSearcherSupplier(), randomBoolean())); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -536,7 +536,7 @@ public void testOpenScrollContextsConcurrently() throws Exception { try { latch.await(); for (; ; ) { - Engine.Reader reader = indexShard.acquireReader(); + Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); try { searchService.createAndPutReaderContext( new ShardScrollRequestTest(indexShard.shardId()), indexShard, reader, true); @@ -984,7 +984,7 @@ public void testLookUpSearchContext() throws Exception { OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); final ReaderContext context = searchService.createAndPutReaderContext(request, indexShard, - indexShard.acquireReader(), randomBoolean()); + indexShard.acquireSearcherSupplier(), randomBoolean()); assertThat(context.id().getId(), equalTo((long) (i + 1))); contextIds.add(context.id()); } @@ -1022,6 +1022,6 @@ public void testOpenReaderContext() { } private ReaderContext createReaderContext(IndexShard shard) { - return new ReaderContext(randomNonNegativeLong(), shard, shard.acquireReader(), randomNonNegativeLong(), false); + return new ReaderContext(randomNonNegativeLong(), shard, shard.acquireSearcherSupplier(), randomNonNegativeLong(), false); } } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java b/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java deleted file mode 100644 index de4863dd92a08..0000000000000 --- a/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.internal; - -import org.elasticsearch.test.ESTestCase; - -public class ScrollContextTests extends ESTestCase { - - public void testStoringObjectsInScrollContext() { - final ScrollContext scrollContext = new ScrollContext(); - final String key = randomAlphaOfLengthBetween(1, 16); - assertNull(scrollContext.getFromContext(key)); - - final String value = randomAlphaOfLength(6); - scrollContext.putInContext(key, value); - - assertEquals(value, scrollContext.getFromContext(key)); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index 9ecb69dbf3b29..84bb36188cf90 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -84,9 +84,9 @@ public Engine.Searcher acquireSearcher(String source, SearcherScope scope) { } @Override - public Reader acquireReader(Function wrapper, SearcherScope scope) throws EngineException { - final Engine.Reader reader = super.acquireReader(wrapper, scope); - return new Reader(searcher -> support().wrapSearcher(searcher)) { + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + final SearcherSupplier reader = super.acquireSearcherSupplier(wrapper, scope); + return new SearcherSupplier(searcher -> support().wrapSearcher(searcher)) { @Override protected Searcher acquireSearcherInternal(String source) { return reader.acquireSearcher(source); diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 3fb57c8f318eb..e99767b896db5 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -30,7 +30,7 @@ /** * This is a stand-alone read-only engine that maintains an index reader that is opened lazily on calls to - * {@link Engine.Reader#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore + * {@link SearcherSupplier#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore * and then releases itself from the engine. * This is necessary to for instance release all SegmentReaders after a search phase finishes and reopen them before the next search * phase starts. @@ -175,10 +175,10 @@ private synchronized ElasticsearchDirectoryReader getReader() { } @Override - public Reader acquireReader(Function wrapper, SearcherScope scope) throws EngineException { + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { final Store store = this.store; store.incRef(); - return new Reader(wrapper) { + return new SearcherSupplier(wrapper) { @Override @SuppressForbidden(reason = "we manage references explicitly here") public Searcher acquireSearcherInternal(String source) { diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index c79372971dab3..0d505880575a9 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -50,7 +50,7 @@ public void testAcquireReleaseReset() throws IOException { listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { assertFalse(frozenEngine.isReaderOpen()); try (Engine.Searcher searcher = reader.acquireSearcher("frozen")) { assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher @@ -87,7 +87,7 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); - Engine.Reader reader1 = frozenEngine.acquireReader(Function.identity()); + Engine.SearcherSupplier reader1 = frozenEngine.acquireSearcherSupplier(Function.identity()); try (Engine.Searcher searcher1 = reader1.acquireSearcher("test")) { assertTrue(frozenEngine.isReaderOpen()); TopDocs search = searcher1.search(new MatchAllDocsQuery(), numDocs); @@ -95,7 +95,7 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { assertEquals(1, listener.afterRefresh.get()); } assertFalse(frozenEngine.isReaderOpen()); - Engine.Reader reader2 = frozenEngine.acquireReader(Function.identity()); + Engine.SearcherSupplier reader2 = frozenEngine.acquireSearcherSupplier(Function.identity()); try (Engine.Searcher searcher2 = reader2.acquireSearcher("test")) { TopDocs search = searcher2.search(new MatchAllDocsQuery(), numDocs); assertEquals(search.scoreDocs.length, numDocs); @@ -129,7 +129,7 @@ public void testSegmentStats() throws IOException { engine.flushAndClose(); listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); try (Engine.Searcher searcher = reader.acquireSearcher("test")) { segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); @@ -178,7 +178,7 @@ public void testCircuitBreakerAccounting() throws IOException { assertEquals(0, breaker.getUsed()); listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { try (Engine.Searcher searcher = reader.acquireSearcher("test")) { assertEquals(expectedUse, breaker.getUsed()); } @@ -231,7 +231,7 @@ public void testSearchConcurrently() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { threads[i] = new Thread(() -> { - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { barrier.await(); for (int j = 0; j < numIters; j++) { try (Engine.Searcher searcher = reader.acquireSearcher("test")) { @@ -315,7 +315,7 @@ public void testCanMatch() throws IOException { listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { DirectoryReader dirReader; - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { try (Engine.Searcher searcher = reader.acquireSearcher("can_match")) { dirReader = searcher.getDirectoryReader(); assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); @@ -328,7 +328,7 @@ public void testCanMatch() throws IOException { } } - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { try (Engine.Searcher searcher = reader.acquireSearcher("can_match")) { assertSame(dirReader, searcher.getDirectoryReader()); assertEquals(0, listener.afterRefresh.get()); @@ -356,14 +356,14 @@ public void testSearchers() throws Exception { // See TransportVerifyShardBeforeCloseAction#executeShardOperation engine.flush(true, true); engine.refresh("test"); - try (Engine.Reader reader = engine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = engine.acquireSearcherSupplier(Function.identity())) { try (Engine.Searcher searcher = reader.acquireSearcher("test")) { totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length; } } } try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { - try (Engine.Reader reader = frozenEngine.acquireReader(Function.identity())) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { try (Engine.Searcher searcher = reader.acquireSearcher("test")) { TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); assertThat(topDocs.scoreDocs.length, equalTo(totalDocs)); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index 1cf5098b8bdd8..1f251b000f03e 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -9,6 +9,7 @@ import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.search.SearchContextMissingException; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContextId; @@ -44,13 +45,21 @@ public SecuritySearchOperationListener(SecurityContext securityContext, XPackLic this.auditTrailService = auditTrail; } + @Override + public void onNewReaderContext(ReaderContext readerContext) { + if (licenseState.isSecurityEnabled()) { + readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, securityContext.getAuthentication()); + } + } + /** * Adds the {@link Authentication} to the {@link ScrollContext} + * @param readerContext */ @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(ReaderContext readerContext) { if (licenseState.isSecurityEnabled()) { - scrollContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, securityContext.getAuthentication()); + readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, securityContext.getAuthentication()); } } @@ -59,10 +68,10 @@ public void onNewScrollContext(ScrollContext scrollContext) { * object from the scroll context with the current authentication context */ @Override - public void validateSearchContext(SearchContext searchContext, TransportRequest request) { + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { if (licenseState.isSecurityEnabled()) { if (searchContext.scrollContext() != null) { - final Authentication originalAuth = searchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); + final Authentication originalAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY); final Authentication current = securityContext.getAuthentication(); final ThreadContext threadContext = securityContext.getThreadContext(); final String action = threadContext.getTransient(ORIGINATING_ACTION_KEY); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java index fc7afcdd1f970..5e11fba3b0861 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java @@ -65,6 +65,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.integration.FieldLevelSecurityTests.openReaders; import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -779,31 +780,23 @@ public void testReaderId() throws Exception { } refresh(); + String readerId = openReaders("user1", TimeValue.timeValueMinutes(1), "test"); SearchResponse response = null; try { - response = client() - .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) - .prepareSearch("test") - .setSize(1) - .setReader(null, TimeValue.timeValueMinutes(1)) - .setQuery(termQuery("field1", "value1")) - .get(); - int from = 0; - do { - assertNoFailures(response); - assertThat(response.getHits().getTotalHits().value, is((long) numVisible)); - assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); - assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); - ++ from; + for (int from = 0; from < numVisible; from++) { response = client() .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) .prepareSearch() .setSize(1) .setFrom(from) - .setReader(response.getReaderId(), TimeValue.timeValueMinutes(1)) + .setReader(readerId, TimeValue.timeValueMinutes(1)) .setQuery(termQuery("field1", "value1")) .get(); - } while (response.getHits().getHits().length > 0); + assertNoFailures(response); + assertThat(response.getHits().getTotalHits().value, is((long) numVisible)); + assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); + } } finally { client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(response.getReaderId())).actionGet(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java index 3e69c32282a59..c23bf2359e964 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java @@ -14,7 +14,10 @@ import org.elasticsearch.action.search.ClearReaderAction; import org.elasticsearch.action.search.ClearReaderRequest; import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.OpenReaderRequest; +import org.elasticsearch.action.search.OpenReaderResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.TransportOpenReaderAction; import org.elasticsearch.action.termvectors.MultiTermVectorsResponse; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; @@ -725,6 +728,14 @@ public void testScroll() throws Exception { } } + static String openReaders(String userName, TimeValue keepAlive, String... indices) { + OpenReaderRequest request = new OpenReaderRequest(indices, OpenReaderRequest.DEFAULT_INDICES_OPTIONS, keepAlive, null, null); + final OpenReaderResponse response = client() + .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue(userName, USERS_PASSWD))) + .execute(TransportOpenReaderAction.INSTANCE, request).actionGet(); + return response.getReaderId(); + } + public void testReaderId() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") .setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) @@ -739,36 +750,27 @@ public void testReaderId() throws Exception { } refresh("test"); + String readerId = openReaders("user1", TimeValue.timeValueMinutes(1), "test"); SearchResponse response = null; try { - response = client() - .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) - .prepareSearch("test") - .setReader(null, TimeValue.timeValueMinutes(1L)) - .setSize(1) - .setQuery(constantScoreQuery(termQuery("field1", "value1"))) - .setFetchSource(true) - .get(); - int from = 0; - do { - assertThat(response.getHits().getTotalHits().value, is((long) numDocs)); - assertThat(response.getHits().getHits().length, is(1)); - assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); - assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); - ++ from; + for (int from = 0; from < numDocs; from++) { response = client() .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) .prepareSearch() - .setReader(response.getReaderId(), TimeValue.timeValueMinutes(1L)) + .setReader(readerId, TimeValue.timeValueMinutes(1L)) .setSize(1) .setFrom(from) .setQuery(constantScoreQuery(termQuery("field1", "value1"))) .setFetchSource(true) .get(); - } while (response.getHits().getHits().length > 0); - assertEquals(numDocs, from); + assertThat(response.getHits().getTotalHits().value, is((long) numDocs)); + assertThat(response.getHits().getHits().length, is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); + ++from; + } } finally { - client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(response.getReaderId())).actionGet(); + client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(readerId)).actionGet(); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 9b53be6e4ccfd..e312014b5084d 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContextId; @@ -55,21 +56,17 @@ public void testUnlicensed() { final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); AuditTrailService auditTrailService = mock(AuditTrailService.class); SearchContext searchContext = mock(SearchContext.class); - ScrollContext scrollContext = new ScrollContext(); - when(searchContext.scrollContext()).thenReturn(scrollContext); + ReaderContext readerContext = new ReaderContext(0L, null, null, Long.MAX_VALUE, false); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); - listener.validateSearchContext(searchContext, Empty.INSTANCE); + listener.onNewScrollContext(readerContext); + listener.validateSearchContext(readerContext, searchContext, Empty.INSTANCE); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrailService, searchContext); } public void testOnNewContextSetsAuthentication() throws Exception { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); - final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(2L)); - testSearchContext.scrollContext().scroll = scroll; + ReaderContext readerContext = new ReaderContext(0L, null, null, Long.MAX_VALUE, false); XPackLicenseState licenseState = mock(XPackLicenseState.class); when(licenseState.isSecurityEnabled()).thenReturn(true); ThreadContext threadContext = new ThreadContext(Settings.EMPTY); @@ -79,20 +76,20 @@ public void testOnNewContextSetsAuthentication() throws Exception { authentication.writeToContext(threadContext); SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(readerContext); - Authentication contextAuth = testSearchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); + Authentication contextAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY); assertEquals(authentication, contextAuth); - assertEquals(scroll, testSearchContext.scrollContext().scroll); verify(licenseState).isSecurityEnabled(); verifyZeroInteractions(auditTrailService); } public void testValidateSearchContext() throws Exception { + ReaderContext readerContext = new ReaderContext(0L, null, null, Long.MAX_VALUE, false); ScrollContext scrollContext = new ScrollContext(); TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); - testSearchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, + readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null)); testSearchContext.scrollContext().scroll = new Scroll(TimeValue.timeValueSeconds(2L)); XPackLicenseState licenseState = mock(XPackLicenseState.class); @@ -107,7 +104,7 @@ public void testValidateSearchContext() throws Exception { try (StoredContext ignore = threadContext.newStoredContext(false)) { Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(testSearchContext, Empty.INSTANCE); + listener.validateSearchContext(readerContext, testSearchContext, Empty.INSTANCE); verify(licenseState).isSecurityEnabled(); verifyZeroInteractions(auditTrail); } @@ -117,7 +114,7 @@ public void testValidateSearchContext() throws Exception { final String realmName = randomAlphaOfLengthBetween(1, 16); Authentication authentication = new Authentication(new User("test", "role"), new RealmRef(realmName, "file", nodeName), null); authentication.writeToContext(threadContext); - listener.validateSearchContext(testSearchContext, Empty.INSTANCE); + listener.validateSearchContext(readerContext, testSearchContext, Empty.INSTANCE); verify(licenseState, times(2)).isSecurityEnabled(); verifyZeroInteractions(auditTrail); } @@ -133,7 +130,7 @@ public void testValidateSearchContext() throws Exception { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = - expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request)); + expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(readerContext, testSearchContext, request)); assertEquals(testSearchContext.id(), expected.contextId()); verify(licenseState, Mockito.atLeast(3)).isSecurityEnabled(); verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request), @@ -151,7 +148,7 @@ public void testValidateSearchContext() throws Exception { authentication.writeToContext(threadContext); threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - listener.validateSearchContext(testSearchContext, request); + listener.validateSearchContext(readerContext, testSearchContext, request); verify(licenseState, Mockito.atLeast(4)).isSecurityEnabled(); verifyNoMoreInteractions(auditTrail); } @@ -169,7 +166,7 @@ public void testValidateSearchContext() throws Exception { (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); SearchContextMissingException expected = - expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request)); + expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(readerContext, testSearchContext, request)); assertEquals(testSearchContext.id(), expected.contextId()); verify(licenseState, Mockito.atLeast(5)).isSecurityEnabled(); verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request),