From dafea3cc23cdc1694843ee1be310a149ad027c10 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 11 Dec 2018 20:24:29 +0100 Subject: [PATCH] Minor search controller changes (#36479) This commit contains a few minor changes to our search code: - adjust the visibility of a couple of methods in our search code to package private from public or protected. - make some of the `SearchPhaseController` methods static where possible - rename one of the `SearchPhaseController#reducedQueryPhase` methods (used only for scroll requests) to `reducedScrollQueryPhase` without the `isScrollRequest` argument which was always set to `true` - replace leniency in `SearchPhaseController#setShardIndex` with an assert to make sure that we never set the shard index twice - remove two null checks where the checked field can never be null - resolve an unchecked warning - replace `List#toArray` invocation that creates an array providing the true size with array creation of length 0 - correct a couple of typos in comments --- .../search/AbstractSearchAsyncAction.java | 2 +- .../action/search/InitialSearchPhase.java | 5 +-- .../action/search/SearchPhaseController.java | 41 +++++++------------ .../SearchScrollQueryAndFetchAsyncAction.java | 2 +- ...SearchScrollQueryThenFetchAsyncAction.java | 4 +- .../search/SearchPhaseResult.java | 2 +- .../aggregations/metrics/InternalTopHits.java | 1 - .../search/SearchPhaseControllerTests.java | 12 +++--- 8 files changed, 27 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 8aa847f753682..27293e8e50f8d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction exten private final TransportSearchAction.SearchTimeProvider timeProvider; private final SearchResponse.Clusters clusters; - protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, + AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, Map> indexRoutings, diff --git a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java index b3b7dba9f38db..4e0db4644786e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java @@ -44,7 +44,7 @@ * and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until * the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later * referred to as the {@code shardIndex}. - * The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection + * The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of * distributed frequencies */ abstract class InitialSearchPhase extends SearchPhase { @@ -327,7 +327,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) { } } - /** * Executed once all shard results have been received and processed * @see #onShardFailure(int, SearchShardTarget, Exception) @@ -367,7 +366,7 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRo abstract static class SearchPhaseResults { private final int numShards; - protected SearchPhaseResults(int numShards) { + SearchPhaseResults(int numShards) { this.numShards = numShards; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index 28a7193646001..b2b3ee6dd2bd7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -21,7 +21,6 @@ import com.carrotsearch.hppc.IntArrayList; import com.carrotsearch.hppc.ObjectObjectHashMap; - import org.apache.lucene.index.Term; import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.FieldDoc; @@ -154,7 +153,7 @@ public AggregatedDfs aggregateDfs(Collection results) { * @param from the offset into the search results top docs * @param size the number of hits to return from the merged top docs */ - public SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, + static SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, final Collection bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) { if (results.isEmpty()) { return SortedTopDocs.EMPTY; @@ -214,7 +213,7 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection results, int topN, int from) { + static TopDocs mergeTopDocs(Collection results, int topN, int from) { if (results.isEmpty()) { return null; } - assert results.isEmpty() == false; final boolean setShardIndex = false; final TopDocs topDocs = results.stream().findFirst().get(); final TopDocs mergedTopDocs; @@ -259,12 +257,8 @@ TopDocs mergeTopDocs(Collection results, int topN, int from) { } private static void setShardIndex(TopDocs topDocs, int shardIndex) { + assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set"; for (ScoreDoc doc : topDocs.scoreDocs) { - if (doc.shardIndex != -1) { - // once there is a single shard index initialized all others will be initialized too - // there are many asserts down in lucene land that this is actually true. we can shortcut it here. - return; - } doc.shardIndex = shardIndex; } } @@ -283,7 +277,6 @@ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase, } } return lastEmittedDocPerShard; - } /** @@ -402,15 +395,15 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr hits.add(searchHit); } } - return new SearchHits(hits.toArray(new SearchHit[hits.size()]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore); + return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore); } /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results */ - public ReducedQueryPhase reducedQueryPhase(Collection queryResults, boolean isScrollRequest) { - return reducedQueryPhase(queryResults, isScrollRequest, true); + public ReducedQueryPhase reducedScrollQueryPhase(Collection queryResults) { + return reducedQueryPhase(queryResults, true, true); } /** @@ -422,7 +415,6 @@ public ReducedQueryPhase reducedQueryPhase(Collection(), new TopDocsStats(trackTotalHits), 0, isScrollRequest); } - /** * Reduces the given query results and consumes all aggregations and profile results. * @param queryResults a list of non-null query shard results @@ -507,15 +499,13 @@ private ReducedQueryPhase reducedQueryPhase(Collection null, reduceContext); } - private InternalAggregations reduceAggs(List aggregationsList, + private static InternalAggregations reduceAggs(List aggregationsList, List pipelineAggregators, ReduceContext reduceContext) { InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); if (pipelineAggregators != null) { @@ -657,7 +647,6 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR this.hasTopDocs = hasTopDocs; this.hasAggs = hasAggs; this.bufferSize = bufferSize; - } @Override @@ -675,10 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { aggsBuffer[0] = reducedAggs; } if (hasTopDocs) { - TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer), + TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer), // we have to merge here in the same way we collect on a shard - querySearchResult.from() + querySearchResult.size() - , 0); + querySearchResult.from() + querySearchResult.size(), 0); Arrays.fill(topDocsBuffer, null); topDocsBuffer[0] = reducedTopDocs; } @@ -692,7 +680,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) { if (hasTopDocs) { final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null topDocsStats.add(topDocs); - SearchPhaseController.setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); + setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex()); topDocsBuffer[i] = topDocs.topDocs; } } @@ -705,7 +693,6 @@ private synchronized List getRemainingTopDocs() { return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null; } - @Override public ReducedQueryPhase reduce() { return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats, @@ -739,7 +726,7 @@ InitialSearchPhase.ArraySearchPhaseResults newSearchPhaseResu return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs); } } - return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { + return new InitialSearchPhase.ArraySearchPhaseResults(numShards) { @Override public ReducedQueryPhase reduce() { return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index 7f36d71ae256b..8195f9626a656 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -52,7 +52,7 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro @Override protected SearchPhase moveToNextPhase(BiFunction clusterNodeLookup) { - return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults); + return sendResponsePhase(searchPhaseController.reducedScrollQueryPhase(queryFetchResults.asList()), queryFetchResults); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index a964d1904edbb..794e3c84f1363 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -69,8 +69,8 @@ protected SearchPhase moveToNextPhase(BiFunction return new SearchPhase("fetch") { @Override public void run() throws IOException { - final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase( - queryResults.asList(), true); + final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase( + queryResults.asList()); if (reducedQueryPhase.scoreDocs.length == 0) { sendResponse(reducedQueryPhase, fetchResults); return; diff --git a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index ede9f525a5a14..f242cbd8eb901 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -25,7 +25,7 @@ import org.elasticsearch.transport.TransportResponse; /** - * This class is a base class for all search releated results. It contains the shard target it + * This class is a base class for all search related results. It contains the shard target it * was executed against, a shard index used to reference the result on the coordinating node * and a request ID that is used to reference the request context on the executing node. The * request ID is particularly important since it is used to reference and maintain a context diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java index ff6e10baee534..60ae4c78edc7e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java @@ -65,7 +65,6 @@ public InternalTopHits(StreamInput in) throws IOException { from = in.readVInt(); size = in.readVInt(); topDocs = Lucene.readTopDocs(in); - assert topDocs != null; searchHits = SearchHits.readSearchHits(in); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java index d87ddc84831e4..c1a170c69ede6 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java @@ -30,6 +30,8 @@ import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.index.Index; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -38,8 +40,6 @@ import org.elasticsearch.search.aggregations.metrics.InternalMax; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.suggest.Suggest; @@ -73,7 +73,7 @@ public void setup() { (b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b)); } - public void testSort() throws Exception { + public void testSort() { List suggestions = new ArrayList<>(); for (int i = 0; i < randomIntBetween(1, 5); i++) { suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false)); @@ -88,7 +88,7 @@ public void testSort() throws Exception { size = first.get().queryResult().size(); } int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results)); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(), from, size) .scoreDocs; for (Suggest.Suggestion suggestion : reducedSuggest(results)) { @@ -113,12 +113,12 @@ public void testSortIsIdempotent() throws Exception { size = first.get().queryResult().size(); } SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats(); - ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; + ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs; results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize, useConstantScore); SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats(); - ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; + ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs; assertEquals(sortedDocs.length, sortedDocs2.length); for (int i = 0; i < sortedDocs.length; i++) { assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);