Skip to content

Commit

Permalink
aggs: Changed how top_hits initialises leaf collectors
Browse files Browse the repository at this point in the history
Both TopDocsCollector and LeafCollector were being kept around at the aggregator level. In case the nested aggregator would do a post collection then this could cause pushing down docids to top hits child aggregators that already moved the next LeafCollector (causing assertions to trip and incorrect results).

By keeping track of the LeafCollector in a seperate map at the leaf bucket level this problem can simply not happen any more as the place holding LeafCollector is no longer shared.

Also LeafCollector instances for TopDocsCollectors are no longer pre-created as the beginning a new segment is evaluated. There is no guarantee that TopHitsAggregator encounters a document for a particular bucket and there has to be logic to create LeafCollector instances which have not been seen before.

Closes #26738
  • Loading branch information
martijnvg committed Sep 22, 2017
1 parent 5b711c2 commit a056c5d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.search.aggregations.metrics.tophits;

import com.carrotsearch.hppc.LongObjectHashMap;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.LeafCollector;
Expand Down Expand Up @@ -54,21 +55,11 @@

public class TopHitsAggregator extends MetricsAggregator {

/** Simple wrapper around a top-level collector and the current leaf collector. */
private static class TopDocsAndLeafCollector {
final TopDocsCollector<?> topLevelCollector;
LeafCollector leafCollector;
private final FetchPhase fetchPhase;
private final SubSearchContext subSearchContext;
private final LongObjectPagedHashMap<TopDocsCollector<?>> topDocsCollectors;

TopDocsAndLeafCollector(TopDocsCollector<?> topLevelCollector) {
this.topLevelCollector = topLevelCollector;
}
}

final FetchPhase fetchPhase;
final SubSearchContext subSearchContext;
final LongObjectPagedHashMap<TopDocsAndLeafCollector> topDocsCollectors;

public TopHitsAggregator(FetchPhase fetchPhase, SubSearchContext subSearchContext, String name, SearchContext context,
TopHitsAggregator(FetchPhase fetchPhase, SubSearchContext subSearchContext, String name, SearchContext context,
Aggregator parent, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
super(name, context, parent, pipelineAggregators, metaData);
this.fetchPhase = fetchPhase;
Expand All @@ -88,31 +79,26 @@ public boolean needsScores() {
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {

public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
// Create leaf collectors here instead of at the aggregator level. Otherwise in case this collector get invoked
// when post collecting then we have already replaced the leaf readers on the aggregator level have already been
// replaced with the next leaf readers and then post collection pushes docids of the previous segement, which
// then causes assertions to trip or incorrect top docs to be computed.
final LongObjectHashMap<LeafCollector> leafCollectors = new LongObjectHashMap<>(1);
return new LeafBucketCollectorBase(sub, null) {

Scorer scorer;

@Override
public void setScorer(Scorer scorer) throws IOException {
this.scorer = scorer;
for (LongObjectPagedHashMap.Cursor<TopDocsAndLeafCollector> cursor : topDocsCollectors) {
// Instantiate the leaf collector not in the getLeafCollector(...) method or in the constructor of this
// anonymous class. Otherwise in the case this leaf bucket collector gets invoked with post collection
// then we already have moved on to the next reader and then we may encounter assertion errors or
// incorrect results.
cursor.value.leafCollector = cursor.value.topLevelCollector.getLeafCollector(ctx);
cursor.value.leafCollector.setScorer(scorer);
}
super.setScorer(scorer);
}

@Override
public void collect(int docId, long bucket) throws IOException {
TopDocsAndLeafCollector collectors = topDocsCollectors.get(bucket);
if (collectors == null) {
TopDocsCollector<?> topDocsCollector = topDocsCollectors.get(bucket);
if (topDocsCollector == null) {
SortAndFormats sort = subSearchContext.sort();
int topN = subSearchContext.from() + subSearchContext.size();
if (sort == null) {
Expand All @@ -123,31 +109,39 @@ public void collect(int docId, long bucket) throws IOException {
// In the QueryPhase we don't need this protection, because it is build into the IndexSearcher,
// but here we create collectors ourselves and we need prevent OOM because of crazy an offset and size.
topN = Math.min(topN, subSearchContext.searcher().getIndexReader().maxDoc());
TopDocsCollector<?> topLevelCollector;
if (sort == null) {
topLevelCollector = TopScoreDocCollector.create(topN);
topDocsCollector = TopScoreDocCollector.create(topN);
} else {
topLevelCollector = TopFieldCollector.create(sort.sort, topN, true, subSearchContext.trackScores(),
subSearchContext.trackScores());
topDocsCollector = TopFieldCollector.create(sort.sort, topN, true, subSearchContext.trackScores(),
subSearchContext.trackScores());
}
topDocsCollectors.put(bucket, topDocsCollector);
}

final LeafCollector leafCollector;
final int key = leafCollectors.indexOf(bucket);
if (key < 0) {
leafCollector = topDocsCollector.getLeafCollector(ctx);
if (scorer != null) {
leafCollector.setScorer(scorer);
}
collectors = new TopDocsAndLeafCollector(topLevelCollector);
collectors.leafCollector = collectors.topLevelCollector.getLeafCollector(ctx);
collectors.leafCollector.setScorer(scorer);
topDocsCollectors.put(bucket, collectors);
leafCollectors.indexInsert(key, bucket, leafCollector);
} else {
leafCollector = leafCollectors.indexGet(key);
}
collectors.leafCollector.collect(docId);
leafCollector.collect(docId);
}
};
}

@Override
public InternalAggregation buildAggregation(long owningBucketOrdinal) {
TopDocsAndLeafCollector topDocsCollector = topDocsCollectors.get(owningBucketOrdinal);
TopDocsCollector<?> topDocsCollector = topDocsCollectors.get(owningBucketOrdinal);
final InternalTopHits topHits;
if (topDocsCollector == null) {
topHits = buildEmptyAggregation();
} else {
TopDocs topDocs = topDocsCollector.topLevelCollector.topDocs();
TopDocs topDocs = topDocsCollector.topDocs();
if (subSearchContext.sort() == null) {
for (RescoreContext ctx : context().rescore()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class TopHitsAggregatorFactory extends AggregatorFactory<TopHitsAggregato
private final List<ScriptFieldsContext.ScriptField> scriptFields;
private final FetchSourceContext fetchSourceContext;

public TopHitsAggregatorFactory(String name, int from, int size, boolean explain, boolean version, boolean trackScores,
TopHitsAggregatorFactory(String name, int from, int size, boolean explain, boolean version, boolean trackScores,
Optional<SortAndFormats> sort, HighlightBuilder highlightBuilder, StoredFieldsContext storedFieldsContext,
List<String> docValueFields, List<ScriptFieldsContext.ScriptField> scriptFields, FetchSourceContext fetchSourceContext,
SearchContext context, AggregatorFactory<?> parent, AggregatorFactories.Builder subFactories, Map<String, Object> metaData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,6 @@ public void testTrackScores() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/26738")
public void testTopHitsInNestedSimple() throws Exception {
SearchResponse searchResponse = client().prepareSearch("articles")
.setQuery(matchQuery("title", "title"))
Expand Down

0 comments on commit a056c5d

Please sign in to comment.