From fee013c07c50aaa06415ec4ce4c77f398add0c1e Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Fri, 5 Aug 2016 17:49:56 -0400 Subject: [PATCH] Add support for returning documents with completion suggester This commit enables completion suggester to return documents associated with suggestions. Now the document source is returned with every suggestion, which respects source filtering options. In case of suggest queries spanning more than one shard, the suggest is executed in two phases, where the last phase fetches the relevant documents from shards, implying executing suggest requests against a single shard is more performant due to the document fetch overhead when the suggest spans multiple shards. --- .../search/AbstractSearchAsyncAction.java | 18 +- .../SearchDfsQueryAndFetchAsyncAction.java | 4 +- .../SearchDfsQueryThenFetchAsyncAction.java | 19 +- .../SearchQueryAndFetchAsyncAction.java | 11 +- .../SearchQueryThenFetchAsyncAction.java | 19 +- .../SearchScrollQueryAndFetchAsyncAction.java | 4 +- ...SearchScrollQueryThenFetchAsyncAction.java | 11 +- .../elasticsearch/search/SearchService.java | 59 ++-- .../controller/SearchPhaseController.java | 256 ++++++++++++------ .../search/fetch/ShardFetchSearchRequest.java | 3 - .../MatchedQueriesFetchSubPhase.java | 4 +- .../search/internal/InternalSearchHit.java | 15 +- .../elasticsearch/search/suggest/Suggest.java | 62 +++-- .../completion/CompletionSuggester.java | 2 +- .../completion/CompletionSuggestion.java | 131 +++++++-- .../SearchPhaseControllerTests.java | 234 ++++++++++++++++ .../suggest/CompletionSuggestSearchIT.java | 113 ++++++++ .../search/suggest/SuggestTests.java | 73 +++++ .../completion/CompletionSuggestionTests.java | 61 +++++ .../suggesters/completion-suggest.asciidoc | 25 +- 20 files changed, 918 insertions(+), 206 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/search/controller/SearchPhaseControllerTests.java create mode 100644 core/src/test/java/org/elasticsearch/search/suggest/SuggestTests.java create mode 100644 core/src/test/java/org/elasticsearch/search/suggest/completion/CompletionSuggestionTests.java diff --git a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 642748bd031e6..f9103f0cddc8e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -46,6 +46,7 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; +import org.elasticsearch.search.suggest.Suggest; import org.elasticsearch.threadpool.ThreadPool; import java.util.List; @@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction protected final AtomicArray firstResults; private volatile AtomicArray shardFailures; private final Object shardFailuresMutex = new Object(); - protected volatile ScoreDoc[] sortedShardList; + protected volatile ScoreDoc[] sortedShardDocs; protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, @@ -321,8 +322,11 @@ protected void releaseIrrelevantSearchContexts(AtomicArray entry : queryResults.asList()) { - final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs(); - if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches + QuerySearchResult queryResult = entry.value.queryResult().queryResult(); + final TopDocs topDocs = queryResult.topDocs(); + final Suggest suggest = queryResult.suggest(); + if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches + ||suggest != null && suggest.hasScoreDocs()) // or had suggest docs && docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs try { DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); @@ -343,12 +347,8 @@ protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) { protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry entry, ScoreDoc[] lastEmittedDocPerShard) { - if (lastEmittedDocPerShard != null) { - ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); - } else { - return new ShardFetchSearchRequest(request, queryResult.id(), entry.value); - } + final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null; + return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc); } protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java index e19540e26d548..8614d7b118881 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryAndFetchAsyncAction.java @@ -118,8 +118,8 @@ private void finishHim() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { - sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index cf3f971671074..9d8305cf6b15d 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -135,18 +135,17 @@ void executeFetchPhase() { } void innerExecuteFetchPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + final boolean isScrollRequest = request.scroll() != null; + sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); + final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ? + searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null; final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); @@ -196,12 +195,10 @@ private void finishHim() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(queryResults, docIdsToLoad); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java index 5d55dd468a5a1..fad4d60275de9 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryAndFetchAsyncAction.java @@ -60,14 +60,11 @@ protected void moveToSecondPhase() throws Exception { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + final boolean isScrollRequest = request.scroll() != null; + sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, firstResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); } diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index a6f9aa26f596c..5f90d291dd22f 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -68,18 +68,17 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportReq @Override protected void moveToSecondPhase() throws Exception { - boolean useScroll = request.scroll() != null; - sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + final boolean isScrollRequest = request.scroll() != null; + sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); if (docIdsToLoad.asList().isEmpty()) { finishHim(); return; } - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard( - request, sortedShardList, firstResults.length() - ); + final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ? + searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null; final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResultProvider queryResult = firstResults.get(entry.index); @@ -129,12 +128,10 @@ private void finishHim() { threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable(listener) { @Override public void doRun() throws IOException { - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, + final boolean isScrollRequest = request.scroll() != null; + final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults, fetchResults); - String scrollId = null; - if (request.scroll() != null) { - scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults); - } + String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null; listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures())); releaseIrrelevantSearchContexts(firstResults, docIdsToLoad); diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java index 94ce1887c3464..72154f224d22e 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryAndFetchAsyncAction.java @@ -168,8 +168,8 @@ private void finishHim() { } private void innerFinishHim() throws Exception { - ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, + ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults); + final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults, queryFetchResults); String scrollId = null; if (request.scroll() != null) { diff --git a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java index ac8715eeb9f61..d9f649a7a55fb 100644 --- a/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java +++ b/core/src/main/java/org/elasticsearch/action/search/SearchScrollQueryThenFetchAsyncAction.java @@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction { private volatile AtomicArray shardFailures; final AtomicArray queryResults; final AtomicArray fetchResults; - private volatile ScoreDoc[] sortedShardList; + private volatile ScoreDoc[] sortedShardDocs; private final AtomicInteger successfulOps; SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService, @@ -165,9 +165,9 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina } private void executeFetchPhase() throws Exception { - sortedShardList = searchPhaseController.sortDocs(true, queryResults); + sortedShardDocs = searchPhaseController.sortDocs(true, queryResults); AtomicArray docIdsToLoad = new AtomicArray<>(queryResults.length()); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs); if (docIdsToLoad.asList().isEmpty()) { finishHim(); @@ -175,7 +175,8 @@ private void executeFetchPhase() throws Exception { } - final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length()); + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), + sortedShardDocs, queryResults.length()); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { IntArrayList docIds = entry.value; @@ -216,7 +217,7 @@ private void finishHim() { } private void innerFinishHim() { - InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults); + InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults); String scrollId = null; if (request.scroll() != null) { scrollId = request.scrollId(); diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java index bfcfcb9d4c8f2..4d618eb057a6a 100644 --- a/core/src/main/java/org/elasticsearch/search/SearchService.java +++ b/core/src/main/java/org/elasticsearch/search/SearchService.java @@ -21,6 +21,7 @@ import com.carrotsearch.hppc.ObjectFloatHashMap; import org.apache.lucene.search.FieldDoc; +import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -87,6 +88,8 @@ import org.elasticsearch.search.searchafter.SearchAfterBuilder; import org.elasticsearch.search.sort.SortAndFormats; import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Cancellable; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -94,6 +97,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -265,7 +269,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t loadOrExecuteQueryPhase(request, context); - if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) { + if (hasHits(context.queryResult()) == false && context.scrollContext() == null) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); @@ -320,7 +324,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) { operationListener.onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); - if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) { + if (hasHits(context.queryResult()) == false && context.scrollContext() == null) { // no hits, we can release the context since there will be no fetch phase freeContext(context.id()); } else { @@ -811,40 +815,55 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc } } - private static final int[] EMPTY_DOC_IDS = new int[0]; - /** * Shortcut ids to load, we load only "from" and up to "size". The phase controller * handles this as well since the result is always size * shards for Q_A_F */ private void shortcutDocIdsToLoad(SearchContext context) { + final int[] docIdsToLoad; + int docsOffset = 0; + final Suggest suggest = context.queryResult().suggest(); + int numSuggestDocs = 0; + final List completionSuggestions; + if (suggest != null && suggest.hasScoreDocs()) { + completionSuggestions = suggest.filter(CompletionSuggestion.class); + for (CompletionSuggestion completionSuggestion : completionSuggestions) { + numSuggestDocs += completionSuggestion.getOptions().size(); + } + } else { + completionSuggestions = Collections.emptyList(); + } if (context.request().scroll() != null) { TopDocs topDocs = context.queryResult().topDocs(); - int[] docIdsToLoad = new int[topDocs.scoreDocs.length]; + docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs]; for (int i = 0; i < topDocs.scoreDocs.length; i++) { - docIdsToLoad[i] = topDocs.scoreDocs[i].doc; + docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc; } - context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); } else { TopDocs topDocs = context.queryResult().topDocs(); if (topDocs.scoreDocs.length < context.from()) { // no more docs... - context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0); - return; - } - int totalSize = context.from() + context.size(); - int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())]; - int counter = 0; - for (int i = context.from(); i < totalSize; i++) { - if (i < topDocs.scoreDocs.length) { - docIdsToLoad[counter] = topDocs.scoreDocs[i].doc; - } else { - break; + docIdsToLoad = new int[numSuggestDocs]; + } else { + int totalSize = context.from() + context.size(); + docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) + + numSuggestDocs]; + for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) { + docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc; } - counter++; } - context.docIdsToLoad(docIdsToLoad, 0, counter); } + for (CompletionSuggestion completionSuggestion : completionSuggestions) { + for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) { + docIdsToLoad[docsOffset++] = option.getDoc().doc; + } + } + context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length); + } + + private static boolean hasHits(final QuerySearchResult searchResult) { + return searchResult.topDocs().scoreDocs.length > 0 || + (searchResult.suggest() != null && searchResult.suggest().hasScoreDocs()); } private void processScroll(InternalScrollSearchRequest request, SearchContext context) { diff --git a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java index b2ce044e4fc4d..97f3b191aa982 100644 --- a/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java +++ b/core/src/main/java/org/elasticsearch/search/controller/SearchPhaseController.java @@ -30,7 +30,6 @@ import org.apache.lucene.search.TermStatistics; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.HppcMaps; import org.elasticsearch.common.component.AbstractComponent; @@ -53,18 +52,22 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.profile.ProfileShardResult; import org.elasticsearch.search.profile.SearchProfileShardResults; -import org.elasticsearch.search.profile.query.QueryProfileShardResult; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.search.suggest.Suggest; +import org.elasticsearch.search.suggest.Suggest.Suggestion; +import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry; +import org.elasticsearch.search.suggest.completion.CompletionSuggestion; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -154,6 +157,10 @@ private static long optionalSum(long left, long right) { } /** + * Returns a score doc array of top N search docs across all shards, followed by top suggest docs for each + * named completion suggestion across all shards. If more than one named completion suggestion is specified in the + * request, the suggest docs for a named suggestion are ordered by the suggestion name. + * * @param ignoreFrom Whether to ignore the from and sort all hits in each shard result. * Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase. * @param resultsArr Shard result holder @@ -191,19 +198,40 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray completionSuggestions; + if (suggest != null) { + completionSuggestions = suggest.filter(CompletionSuggestion.class); + for (CompletionSuggestion suggestion : completionSuggestions) { + numSuggestDocs += suggestion.getOptions().size(); + } + } else { + completionSuggestions = Collections.emptyList(); } - - int resultDocsSize = result.size(); - if ((scoreDocs.length - offset) < resultDocsSize) { - resultDocsSize = scoreDocs.length - offset; + int docsOffset = 0; + if (scoreDocs.length == 0 || scoreDocs.length < offset) { + docs = new ScoreDoc[numSuggestDocs]; + } else { + int resultDocsSize = result.size(); + if ((scoreDocs.length - offset) < resultDocsSize) { + resultDocsSize = scoreDocs.length - offset; + } + docs = new ScoreDoc[resultDocsSize + numSuggestDocs]; + for (int i = 0; i < resultDocsSize; i++) { + ScoreDoc scoreDoc = scoreDocs[offset + i]; + scoreDoc.shardIndex = shardIndex; + docs[i] = scoreDoc; + docsOffset++; + } } - ScoreDoc[] docs = new ScoreDoc[resultDocsSize]; - for (int i = 0; i < resultDocsSize; i++) { - ScoreDoc scoreDoc = scoreDocs[offset + i]; - scoreDoc.shardIndex = shardIndex; - docs[i] = scoreDoc; + for (CompletionSuggestion suggestion: completionSuggestions) { + for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { + ScoreDoc doc = option.getDoc(); + doc.shardIndex = shardIndex; + docs[docsOffset++] = doc; + } } return docs; } @@ -213,13 +241,7 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray>> groupedCompletionSuggestions = new HashMap<>(); + // group suggestions and assign shard index + for (AtomicArray.Entry sortedResult : sortedResults) { + Suggest shardSuggest = sortedResult.value.queryResult().suggest(); + if (shardSuggest != null) { + for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) { + suggestion.setShardIndex(sortedResult.index); + List> suggestions = + groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestions.add(suggestion); + } + } + } + if (groupedCompletionSuggestions.isEmpty() == false) { + int numSuggestDocs = 0; + List>> completionSuggestions = + new ArrayList<>(groupedCompletionSuggestions.size()); + for (List> groupedSuggestions : groupedCompletionSuggestions.values()) { + final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions); + assert completionSuggestion != null; + numSuggestDocs += completionSuggestion.getOptions().size(); + completionSuggestions.add(completionSuggestion); + } + scoreDocs = new ScoreDoc[mergedTopDocs.scoreDocs.length + numSuggestDocs]; + System.arraycopy(mergedTopDocs.scoreDocs, 0, scoreDocs, 0, mergedTopDocs.scoreDocs.length); + int offset = mergedTopDocs.scoreDocs.length; + Suggest suggestions = new Suggest(completionSuggestions); + for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) { + for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) { + scoreDocs[offset++] = option.getDoc(); + } + } } + return scoreDocs; } - public ScoreDoc[] getLastEmittedDocPerShard(ScoreDoc[] sortedShardList, int numShards) { + public ScoreDoc[] getLastEmittedDocPerShard(List> queryResults, + ScoreDoc[] sortedScoreDocs, int numShards) { ScoreDoc[] lastEmittedDocPerShard = new ScoreDoc[numShards]; - for (ScoreDoc scoreDoc : sortedShardList) { - lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc; + if (queryResults.isEmpty() == false) { + long fetchHits = 0; + for (AtomicArray.Entry queryResult : queryResults) { + fetchHits += queryResult.value.queryResult().topDocs().scoreDocs.length; + } + // from is always zero as when we use scroll, we ignore from + long size = Math.min(fetchHits, topN(queryResults)); + for (int sortedDocsIndex = 0; sortedDocsIndex < size; sortedDocsIndex++) { + ScoreDoc scoreDoc = sortedScoreDocs[sortedDocsIndex]; + lastEmittedDocPerShard[scoreDoc.shardIndex] = scoreDoc; + } } return lastEmittedDocPerShard; + } /** * Builds an array, with potential null elements, with docs to load. */ - public void fillDocIdsToLoad(AtomicArray docsIdsToLoad, ScoreDoc[] shardDocs) { + public void fillDocIdsToLoad(AtomicArray docIdsToLoad, ScoreDoc[] shardDocs) { for (ScoreDoc shardDoc : shardDocs) { - IntArrayList list = docsIdsToLoad.get(shardDoc.shardIndex); - if (list == null) { - list = new IntArrayList(); // can't be shared!, uses unsafe on it later on - docsIdsToLoad.set(shardDoc.shardIndex, list); + IntArrayList shardDocIdsToLoad = docIdsToLoad.get(shardDoc.shardIndex); + if (shardDocIdsToLoad == null) { + shardDocIdsToLoad = new IntArrayList(); // can't be shared!, uses unsafe on it later on + docIdsToLoad.set(shardDoc.shardIndex, shardDocIdsToLoad); } - list.add(shardDoc.doc); + shardDocIdsToLoad.add(shardDoc.doc); } } - public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray queryResultsArr, + /** + * Enriches search hits and completion suggestion hits from sortedDocs using fetchResultsArr, + * merges suggestions, aggregations and profile results + * + * Expects sortedDocs to have top search docs across all shards, optionally followed by top suggest docs for each named + * completion suggestion ordered by suggestion name + */ + public InternalSearchResponse merge(boolean ignoreFrom, ScoreDoc[] sortedDocs, + AtomicArray queryResultsArr, AtomicArray fetchResultsArr) { List> queryResults = queryResultsArr.asList(); @@ -317,6 +385,7 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray entry : fetchResults) { entry.value.fetchResult().initCounter(); } - + int from = ignoreFrom ? 0 : firstResult.queryResult().from(); + int numSearchHits = (int) Math.min(fetchHits - from, topN(queryResults)); // merge hits List hits = new ArrayList<>(); if (!fetchResults.isEmpty()) { - for (ScoreDoc shardDoc : sortedDocs) { + for (int i = 0; i < numSearchHits; i++) { + ScoreDoc shardDoc = sortedDocs[i]; FetchSearchResultProvider fetchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); if (fetchResultProvider == null) { continue; @@ -360,7 +432,6 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray> groupedSuggestions = new HashMap<>(); - boolean hasSuggestions = false; - for (AtomicArray.Entry entry : queryResults) { - Suggest shardResult = entry.value.queryResult().queryResult().suggest(); - - if (shardResult == null) { - continue; + if (firstResult.suggest() != null) { + final Map> groupedSuggestions = new HashMap<>(); + for (AtomicArray.Entry queryResult : queryResults) { + Suggest shardSuggest = queryResult.value.queryResult().suggest(); + if (shardSuggest != null) { + for (Suggestion> suggestion : shardSuggest) { + List suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>()); + suggestionList.add(suggestion); + } + } + } + if (groupedSuggestions.isEmpty() == false) { + suggest = new Suggest(Suggest.reduce(groupedSuggestions)); + if (!fetchResults.isEmpty()) { + int currentOffset = numSearchHits; + for (CompletionSuggestion suggestion : suggest.filter(CompletionSuggestion.class)) { + final List suggestionOptions = suggestion.getOptions(); + for (int scoreDocIndex = currentOffset; scoreDocIndex < currentOffset + suggestionOptions.size(); scoreDocIndex++) { + ScoreDoc shardDoc = sortedDocs[scoreDocIndex]; + FetchSearchResultProvider fetchSearchResultProvider = fetchResultsArr.get(shardDoc.shardIndex); + if (fetchSearchResultProvider == null) { + continue; + } + FetchSearchResult fetchResult = fetchSearchResultProvider.fetchResult(); + int fetchResultIndex = fetchResult.counterGetAndIncrement(); + if (fetchResultIndex < fetchResult.hits().internalHits().length) { + InternalSearchHit hit = fetchResult.hits().internalHits()[fetchResultIndex]; + CompletionSuggestion.Entry.Option suggestOption = + suggestionOptions.get(scoreDocIndex - currentOffset); + hit.score(shardDoc.score); + hit.shard(fetchResult.shardTarget()); + suggestOption.setHit(hit); + } + } + currentOffset += suggestionOptions.size(); + } + assert currentOffset == sortedDocs.length : "expected no more score doc slices"; } - hasSuggestions = true; - Suggest.group(groupedSuggestions, shardResult); } - - suggest = hasSuggestions ? new Suggest(Suggest.reduce(groupedSuggestions)) : null; } - // merge addAggregation + // merge Aggregation InternalAggregations aggregations = null; - if (!queryResults.isEmpty()) { - if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) { - List aggregationsList = new ArrayList<>(queryResults.size()); - for (AtomicArray.Entry entry : queryResults) { - aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) { + List aggregationsList = new ArrayList<>(queryResults.size()); + for (AtomicArray.Entry entry : queryResults) { + aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations()); + } + ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); + aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); + List pipelineAggregators = firstResult.pipelineAggregators(); + if (pipelineAggregators != null) { + List newAggs = StreamSupport.stream(aggregations.spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { + InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); + newAggs.add(newAgg); } - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); - aggregations = InternalAggregations.reduce(aggregationsList, reduceContext); + aggregations = new InternalAggregations(newAggs); } } //Collect profile results SearchProfileShardResults shardResults = null; - if (!queryResults.isEmpty() && firstResult.profileResults() != null) { + if (firstResult.profileResults() != null) { Map profileResults = new HashMap<>(queryResults.size()); for (AtomicArray.Entry entry : queryResults) { String key = entry.value.queryResult().shardTarget().toString(); @@ -416,24 +520,22 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray pipelineAggregators = firstResult.pipelineAggregators(); - if (pipelineAggregators != null) { - List newAggs = StreamSupport.stream(aggregations.spliterator(), false).map((p) -> { - return (InternalAggregation) p; - }).collect(Collectors.toList()); - for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) { - ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, clusterService.state()); - InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), reduceContext); - newAggs.add(newAgg); - } - aggregations = new InternalAggregations(newAggs); - } - } - InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore); return new InternalSearchResponse(searchHits, aggregations, suggest, shardResults, timedOut, terminatedEarly); } + /** + * returns the number of top results to be considered across all shards + */ + private static int topN(List> queryResults) { + QuerySearchResultProvider firstResult = queryResults.get(0).value; + int topN = firstResult.queryResult().size(); + if (firstResult.includeFetch()) { + // if we did both query and fetch on the same go, we have fetched all the docs from each shards already, use them... + // this is also important since we shortcut and fetch only docs from "from" and up to "size" + topN *= queryResults.size(); + } + return topN; + } } diff --git a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java index d908aca0fc8a9..f6738f9972511 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/ShardFetchSearchRequest.java @@ -39,10 +39,7 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice private OriginalIndices originalIndices; public ShardFetchSearchRequest() { - } - public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list) { - this(request, id, list, null); } public ShardFetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) { diff --git a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java index 59225f93a61ea..17f5e5ac705bb 100644 --- a/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java +++ b/core/src/main/java/org/elasticsearch/search/fetch/matchedqueries/MatchedQueriesFetchSubPhase.java @@ -43,7 +43,9 @@ public final class MatchedQueriesFetchSubPhase implements FetchSubPhase { @Override public void hitsExecute(SearchContext context, InternalSearchHit[] hits) { - if (hits.length == 0) { + if (hits.length == 0 || + // in case the request has only suggest, parsed query is null + context.parsedQuery() == null) { return; } hits = hits.clone(); // don't modify the incoming hits diff --git a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 191537b4de59a..e1d46dd5fd287 100644 --- a/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/core/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -415,8 +415,8 @@ public static class Fields { static final String INNER_HITS = "inner_hits"; } - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + // public because we render hit as part of completion suggestion option + public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException { List metaFields = new ArrayList<>(); List otherFields = new ArrayList<>(); if (fields != null && !fields.isEmpty()) { @@ -432,7 +432,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } - builder.startObject(); // For inner_hit hits shard is null and that is ok, because the parent search hit has all this information. // Even if this was included in the inner_hit hits this would be the same, so better leave it out. if (explanation() != null && shard != null) { @@ -516,7 +515,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.endObject(); } - builder.endObject(); return builder; } @@ -533,6 +531,15 @@ private void buildExplanation(XContentBuilder builder, Explanation explanation) builder.endArray(); } builder.endObject(); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + toInnerXContent(builder, params); + builder.endObject(); + return builder; } public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java index f8fbdaf969e83..95612693f8b4c 100644 --- a/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java +++ b/core/src/main/java/org/elasticsearch/search/suggest/Suggest.java @@ -40,6 +40,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * Top level suggest result, containing the result for each suggestion. @@ -48,18 +49,16 @@ public class Suggest implements Iterable COMPARATOR = new Comparator() { - @Override - public int compare(Option first, Option second) { - int cmp = Float.compare(second.getScore(), first.getScore()); - if (cmp != 0) { - return cmp; - } - return first.getText().compareTo(second.getText()); - } - }; + public static final Comparator