Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor search controller changes #36479

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final TransportSearchAction.SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* and collect the results. If a shard request returns a failure this class handles the advance to the next replica of the shard until
* the shards replica iterator is exhausted. Each shard is referenced by position in the {@link GroupShardsIterator} which is later
* referred to as the {@code shardIndex}.
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection
* The fan out and collect algorithm is traditionally used as the initial phase which can either be a query execution or collection of
* distributed frequencies
*/
abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends SearchPhase {
Expand Down Expand Up @@ -327,7 +327,6 @@ private void successfulShardExecution(SearchShardIterator shardsIt) {
}
}


/**
* Executed once all shard results have been received and processed
* @see #onShardFailure(int, SearchShardTarget, Exception)
Expand Down Expand Up @@ -367,7 +366,7 @@ protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRo
abstract static class SearchPhaseResults<Result extends SearchPhaseResult> {
private final int numShards;

protected SearchPhaseResults(int numShards) {
SearchPhaseResults(int numShards) {
this.numShards = numShards;
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;

import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.FieldDoc;
Expand Down Expand Up @@ -154,7 +153,7 @@ public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
* @param from the offset into the search results top docs
* @param size the number of hits to return from the merged top docs
*/
public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
if (results.isEmpty()) {
return SortedTopDocs.EMPTY;
Expand Down Expand Up @@ -214,7 +213,7 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
final boolean isSortedByField;
final SortField[] sortFields;
if (mergedTopDocs != null && mergedTopDocs instanceof TopFieldDocs) {
if (mergedTopDocs instanceof TopFieldDocs) {
TopFieldDocs fieldDocs = (TopFieldDocs) mergedTopDocs;
isSortedByField = (fieldDocs instanceof CollapseTopFieldDocs &&
fieldDocs.fields.length == 1 && fieldDocs.fields[0].getType() == SortField.Type.SCORE) == false;
Expand All @@ -230,11 +229,10 @@ public SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
}
}

TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
static TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
if (results.isEmpty()) {
return null;
}
assert results.isEmpty() == false;
final boolean setShardIndex = false;
final TopDocs topDocs = results.stream().findFirst().get();
final TopDocs mergedTopDocs;
Expand All @@ -259,12 +257,8 @@ TopDocs mergeTopDocs(Collection<TopDocs> results, int topN, int from) {
}

private static void setShardIndex(TopDocs topDocs, int shardIndex) {
assert topDocs.scoreDocs.length == 0 || topDocs.scoreDocs[0].shardIndex == -1 : "shardIndex is already set";
for (ScoreDoc doc : topDocs.scoreDocs) {
if (doc.shardIndex != -1) {
// once there is a single shard index initialized all others will be initialized too
// there are many asserts down in lucene land that this is actually true. we can shortcut it here.
return;
}
doc.shardIndex = shardIndex;
}
}
Expand All @@ -283,7 +277,6 @@ public ScoreDoc[] getLastEmittedDocPerShard(ReducedQueryPhase reducedQueryPhase,
}
}
return lastEmittedDocPerShard;

}

/**
Expand Down Expand Up @@ -402,15 +395,15 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
hits.add(searchHit);
}
}
return new SearchHits(hits.toArray(new SearchHit[hits.size()]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
return new SearchHits(hits.toArray(new SearchHit[0]), reducedQueryPhase.totalHits, reducedQueryPhase.maxScore);
}

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
*/
public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults, boolean isScrollRequest) {
return reducedQueryPhase(queryResults, isScrollRequest, true);
public ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
return reducedQueryPhase(queryResults, true, true);
}

/**
Expand All @@ -422,7 +415,6 @@ public ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResul
return reducedQueryPhase(queryResults, null, new ArrayList<>(), new TopDocsStats(trackTotalHits), 0, isScrollRequest);
}


/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
Expand Down Expand Up @@ -507,15 +499,13 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs scoreDocs = this.sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs scoreDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.maxScore,
timedOut, terminatedEarly, suggest, aggregations, shardResults, scoreDocs.scoreDocs, scoreDocs.sortFields,
firstResult != null ? firstResult.sortValueFormats() : null,
numReducePhases, scoreDocs.isSortedByField, size, from, firstResult == null);
firstResult.sortValueFormats(), numReducePhases, scoreDocs.isSortedByField, size, from, false);
}


/**
* Performs an intermediate reduce phase on the aggregations. For instance with this reduce phase never prune information
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
Expand All @@ -526,7 +516,7 @@ private InternalAggregations reduceAggsIncrementally(List<InternalAggregations>
null, reduceContext);
}

private InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
private static InternalAggregations reduceAggs(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> pipelineAggregators, ReduceContext reduceContext) {
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, reduceContext);
if (pipelineAggregators != null) {
Expand Down Expand Up @@ -657,7 +647,6 @@ private QueryPhaseResultConsumer(SearchPhaseController controller, int expectedR
this.hasTopDocs = hasTopDocs;
this.hasAggs = hasAggs;
this.bufferSize = bufferSize;

}

@Override
Expand All @@ -675,10 +664,9 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
aggsBuffer[0] = reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = controller.mergeTopDocs(Arrays.asList(topDocsBuffer),
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
// we have to merge here in the same way we collect on a shard
querySearchResult.from() + querySearchResult.size()
, 0);
querySearchResult.from() + querySearchResult.size(), 0);
Arrays.fill(topDocsBuffer, null);
topDocsBuffer[0] = reducedTopDocs;
}
Expand All @@ -692,7 +680,7 @@ private synchronized void consumeInternal(QuerySearchResult querySearchResult) {
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
topDocsStats.add(topDocs);
SearchPhaseController.setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
setShardIndex(topDocs.topDocs, querySearchResult.getShardIndex());
topDocsBuffer[i] = topDocs.topDocs;
}
}
Expand All @@ -705,7 +693,6 @@ private synchronized List<TopDocs> getRemainingTopDocs() {
return hasTopDocs ? Arrays.asList(topDocsBuffer).subList(0, index) : null;
}


@Override
public ReducedQueryPhase reduce() {
return controller.reducedQueryPhase(results.asList(), getRemainingAggs(), getRemainingTopDocs(), topDocsStats,
Expand Down Expand Up @@ -739,7 +726,7 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs);
}
}
return new InitialSearchPhase.ArraySearchPhaseResults(numShards) {
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
@Override
public ReducedQueryPhase reduce() {
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void executeInitialPhase(Transport.Connection connection, InternalScro

@Override
protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode> clusterNodeLookup) {
return sendResponsePhase(searchPhaseController.reducedQueryPhase(queryFetchResults.asList(), true), queryFetchResults);
return sendResponsePhase(searchPhaseController.reducedScrollQueryPhase(queryFetchResults.asList()), queryFetchResults);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ protected SearchPhase moveToNextPhase(BiFunction<String, String, DiscoveryNode>
return new SearchPhase("fetch") {
@Override
public void run() throws IOException {
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedQueryPhase(
queryResults.asList(), true);
final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = searchPhaseController.reducedScrollQueryPhase(
queryResults.asList());
if (reducedQueryPhase.scoreDocs.length == 0) {
sendResponse(reducedQueryPhase, fetchResults);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.elasticsearch.transport.TransportResponse;

/**
* This class is a base class for all search releated results. It contains the shard target it
* This class is a base class for all search related results. It contains the shard target it
* was executed against, a shard index used to reference the result on the coordinating node
* and a request ID that is used to reference the request context on the executing node. The
* request ID is particularly important since it is used to reference and maintain a context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ public InternalTopHits(StreamInput in) throws IOException {
from = in.readVInt();
size = in.readVInt();
topDocs = Lucene.readTopDocs(in);
assert topDocs != null;
searchHits = SearchHits.readSearchHits(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand All @@ -38,8 +40,6 @@
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void setup() {
(b) -> new InternalAggregation.ReduceContext(BigArrays.NON_RECYCLING_INSTANCE, null, b));
}

public void testSort() throws Exception {
public void testSort() {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
suggestions.add(new CompletionSuggestion(randomAlphaOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20), false));
Expand All @@ -88,7 +88,7 @@ public void testSort() throws Exception {
size = first.get().queryResult().size();
}
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(true, results.asList(), null, new SearchPhaseController.TopDocsStats(),
from, size)
.scoreDocs;
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
Expand All @@ -113,12 +113,12 @@ public void testSortIsIdempotent() throws Exception {
size = first.get().queryResult().size();
}
SearchPhaseController.TopDocsStats topDocsStats = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;
ScoreDoc[] sortedDocs = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats, from, size).scoreDocs;

results = generateSeededQueryResults(randomSeed, nShards, Collections.emptyList(), queryResultSize,
useConstantScore);
SearchPhaseController.TopDocsStats topDocsStats2 = new SearchPhaseController.TopDocsStats();
ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
ScoreDoc[] sortedDocs2 = SearchPhaseController.sortDocs(ignoreFrom, results.asList(), null, topDocsStats2, from, size).scoreDocs;
assertEquals(sortedDocs.length, sortedDocs2.length);
for (int i = 0; i < sortedDocs.length; i++) {
assertEquals(sortedDocs[i].doc, sortedDocs2[i].doc);
Expand Down