Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for returning documents with completion suggester #19536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
Expand Down Expand Up @@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList;
protected volatile ScoreDoc[] sortedShardDocs;

protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
Expand Down Expand Up @@ -321,8 +322,11 @@ protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearch
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
QuerySearchResult queryResult = entry.value.queryResult().queryResult();
final TopDocs topDocs = queryResult.topDocs();
final Suggest suggest = queryResult.suggest();
if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches
||suggest != null && suggest.hasScoreDocs()) // or had suggest docs
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
Expand All @@ -343,12 +347,8 @@ protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {

protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
} else {
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
}
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null;
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
}

protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,17 @@ void executeFetchPhase() {
}

void innerExecuteFetchPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}

final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
Expand Down Expand Up @@ -196,12 +195,10 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,11 @@ protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,17 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportReq

@Override
protected void moveToSecondPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}

final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
Expand Down Expand Up @@ -129,12 +128,10 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ private void finishHim() {
}

private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ScoreDoc[] sortedShardList;
private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;

SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
Expand Down Expand Up @@ -165,17 +165,18 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina
}

private void executeFetchPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}


final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(),
sortedShardDocs, queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
IntArrayList docIds = entry.value;
Expand Down Expand Up @@ -216,7 +217,7 @@ private void finishHim() {
}

private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
59 changes: 39 additions & 20 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.carrotsearch.hppc.ObjectFloatHashMap;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand Down Expand Up @@ -87,13 +88,16 @@
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -265,7 +269,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t

loadOrExecuteQueryPhase(request, context);

if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
Expand Down Expand Up @@ -320,7 +324,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
Expand Down Expand Up @@ -811,40 +815,55 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}
}

private static final int[] EMPTY_DOC_IDS = new int[0];

/**
* Shortcut ids to load, we load only "from" and up to "size". The phase controller
* handles this as well since the result is always size * shards for Q_A_F
*/
private void shortcutDocIdsToLoad(SearchContext context) {
final int[] docIdsToLoad;
int docsOffset = 0;
final Suggest suggest = context.queryResult().suggest();
int numSuggestDocs = 0;
final List<CompletionSuggestion> completionSuggestions;
if (suggest != null && suggest.hasScoreDocs()) {
completionSuggestions = suggest.filter(CompletionSuggestion.class);
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
numSuggestDocs += completionSuggestion.getOptions().size();
}
} else {
completionSuggestions = Collections.emptyList();
}
if (context.request().scroll() != null) {
TopDocs topDocs = context.queryResult().topDocs();
int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
} else {
TopDocs topDocs = context.queryResult().topDocs();
if (topDocs.scoreDocs.length < context.from()) {
// no more docs...
context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
return;
}
int totalSize = context.from() + context.size();
int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
int counter = 0;
for (int i = context.from(); i < totalSize; i++) {
if (i < topDocs.scoreDocs.length) {
docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
} else {
break;
docIdsToLoad = new int[numSuggestDocs];
} else {
int totalSize = context.from() + context.size();
docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
numSuggestDocs];
for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
counter++;
}
context.docIdsToLoad(docIdsToLoad, 0, counter);
}
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
docIdsToLoad[docsOffset++] = option.getDoc().doc;
}
}
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}

private static boolean hasHits(final QuerySearchResult searchResult) {
return searchResult.topDocs().scoreDocs.length > 0 ||
(searchResult.suggest() != null && searchResult.suggest().hasScoreDocs());
}

private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
Expand Down
Loading