Skip to content

Commit

Permalink
Merge pull request #19536 from areek/enhancement/completion_suggester…
Browse files Browse the repository at this point in the history
…_documents

Add support for returning documents with completion suggester
  • Loading branch information
areek authored Aug 5, 2016
2 parents fbbb633 + fee013c commit 469eb25
Show file tree
Hide file tree
Showing 20 changed files with 918 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.List;
Expand Down Expand Up @@ -74,7 +75,7 @@ abstract class AbstractSearchAsyncAction<FirstResult extends SearchPhaseResult>
protected final AtomicArray<FirstResult> firstResults;
private volatile AtomicArray<ShardSearchFailure> shardFailures;
private final Object shardFailuresMutex = new Object();
protected volatile ScoreDoc[] sortedShardList;
protected volatile ScoreDoc[] sortedShardDocs;

protected AbstractSearchAsyncAction(ESLogger logger, SearchTransportService searchTransportService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
Expand Down Expand Up @@ -321,8 +322,11 @@ protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearch
// we only release search context that we did not fetch from if we are not scrolling
if (request.scroll() == null) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : queryResults.asList()) {
final TopDocs topDocs = entry.value.queryResult().queryResult().topDocs();
if (topDocs != null && topDocs.scoreDocs.length > 0 // the shard had matches
QuerySearchResult queryResult = entry.value.queryResult().queryResult();
final TopDocs topDocs = queryResult.topDocs();
final Suggest suggest = queryResult.suggest();
if (((topDocs != null && topDocs.scoreDocs.length > 0) // the shard had matches
||suggest != null && suggest.hasScoreDocs()) // or had suggest docs
&& docIdsToLoad.get(entry.index) == null) { // but none of them made it to the global top docs
try {
DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId());
Expand All @@ -343,12 +347,8 @@ protected void sendReleaseSearchContext(long contextId, DiscoveryNode node) {

protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry,
ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
} else {
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
}
final ScoreDoc lastEmittedDoc = (lastEmittedDocPerShard != null) ? lastEmittedDocPerShard[entry.index] : null;
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
}

protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,17 @@ void executeFetchPhase() {
}

void innerExecuteFetchPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, queryResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}

final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final ScoreDoc[] lastEmittedDocPerShard = (request.scroll() != null) ?
searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
Expand Down Expand Up @@ -196,12 +195,10 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, queryResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,11 @@ protected void moveToSecondPhase() throws Exception {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,17 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportReq

@Override
protected void moveToSecondPhase() throws Exception {
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
final boolean isScrollRequest = request.scroll() != null;
sortedShardDocs = searchPhaseController.sortDocs(isScrollRequest, firstResults);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}

final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(
request, sortedShardList, firstResults.length()
);
final ScoreDoc[] lastEmittedDocPerShard = isScrollRequest ?
searchPhaseController.getLastEmittedDocPerShard(firstResults.asList(), sortedShardDocs, firstResults.length()) : null;
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
Expand Down Expand Up @@ -129,12 +128,10 @@ private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
final boolean isScrollRequest = request.scroll() != null;
final InternalSearchResponse internalResponse = searchPhaseController.merge(isScrollRequest, sortedShardDocs, firstResults,
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults);
}
String scrollId = isScrollRequest ? TransportSearchHelper.buildScrollId(request.searchType(), firstResults) : null;
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,
successfulOps.get(), buildTookInMillis(), buildShardFailures()));
releaseIrrelevantSearchContexts(firstResults, docIdsToLoad);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ private void finishHim() {
}

private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
ScoreDoc[] sortedShardDocs = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryFetchResults,
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SearchScrollQueryThenFetchAsyncAction extends AbstractAsyncAction {
private volatile AtomicArray<ShardSearchFailure> shardFailures;
final AtomicArray<QuerySearchResult> queryResults;
final AtomicArray<FetchSearchResult> fetchResults;
private volatile ScoreDoc[] sortedShardList;
private volatile ScoreDoc[] sortedShardDocs;
private final AtomicInteger successfulOps;

SearchScrollQueryThenFetchAsyncAction(ESLogger logger, ClusterService clusterService,
Expand Down Expand Up @@ -165,17 +165,18 @@ void onQueryPhaseFailure(final int shardIndex, final AtomicInteger counter, fina
}

private void executeFetchPhase() throws Exception {
sortedShardList = searchPhaseController.sortDocs(true, queryResults);
sortedShardDocs = searchPhaseController.sortDocs(true, queryResults);
AtomicArray<IntArrayList> docIdsToLoad = new AtomicArray<>(queryResults.length());
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList);
searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardDocs);

if (docIdsToLoad.asList().isEmpty()) {
finishHim();
return;
}


final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, queryResults.length());
final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(queryResults.asList(),
sortedShardDocs, queryResults.length());
final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size());
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
IntArrayList docIds = entry.value;
Expand Down Expand Up @@ -216,7 +217,7 @@ private void finishHim() {
}

private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(true, sortedShardDocs, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
59 changes: 39 additions & 20 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.carrotsearch.hppc.ObjectFloatHashMap;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
Expand Down Expand Up @@ -87,13 +88,16 @@
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -265,7 +269,7 @@ public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) t

loadOrExecuteQueryPhase(request, context);

if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
freeContext(context.id());
} else {
contextProcessedSuccessfully(context);
Expand Down Expand Up @@ -320,7 +324,7 @@ public QuerySearchResult executeQueryPhase(QuerySearchRequest request) {
operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
if (hasHits(context.queryResult()) == false && context.scrollContext() == null) {
// no hits, we can release the context since there will be no fetch phase
freeContext(context.id());
} else {
Expand Down Expand Up @@ -811,40 +815,55 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}
}

private static final int[] EMPTY_DOC_IDS = new int[0];

/**
* Shortcut ids to load, we load only "from" and up to "size". The phase controller
* handles this as well since the result is always size * shards for Q_A_F
*/
private void shortcutDocIdsToLoad(SearchContext context) {
final int[] docIdsToLoad;
int docsOffset = 0;
final Suggest suggest = context.queryResult().suggest();
int numSuggestDocs = 0;
final List<CompletionSuggestion> completionSuggestions;
if (suggest != null && suggest.hasScoreDocs()) {
completionSuggestions = suggest.filter(CompletionSuggestion.class);
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
numSuggestDocs += completionSuggestion.getOptions().size();
}
} else {
completionSuggestions = Collections.emptyList();
}
if (context.request().scroll() != null) {
TopDocs topDocs = context.queryResult().topDocs();
int[] docIdsToLoad = new int[topDocs.scoreDocs.length];
docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
for (int i = 0; i < topDocs.scoreDocs.length; i++) {
docIdsToLoad[i] = topDocs.scoreDocs[i].doc;
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
} else {
TopDocs topDocs = context.queryResult().topDocs();
if (topDocs.scoreDocs.length < context.from()) {
// no more docs...
context.docIdsToLoad(EMPTY_DOC_IDS, 0, 0);
return;
}
int totalSize = context.from() + context.size();
int[] docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size())];
int counter = 0;
for (int i = context.from(); i < totalSize; i++) {
if (i < topDocs.scoreDocs.length) {
docIdsToLoad[counter] = topDocs.scoreDocs[i].doc;
} else {
break;
docIdsToLoad = new int[numSuggestDocs];
} else {
int totalSize = context.from() + context.size();
docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
numSuggestDocs];
for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
}
counter++;
}
context.docIdsToLoad(docIdsToLoad, 0, counter);
}
for (CompletionSuggestion completionSuggestion : completionSuggestions) {
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
docIdsToLoad[docsOffset++] = option.getDoc().doc;
}
}
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
}

private static boolean hasHits(final QuerySearchResult searchResult) {
return searchResult.topDocs().scoreDocs.length > 0 ||
(searchResult.suggest() != null && searchResult.suggest().hasScoreDocs());
}

private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
Expand Down
Loading

0 comments on commit 469eb25

Please sign in to comment.