Skip to content

Commit

Permalink
Refactor: Aggs use NOOP leaf collector (backport of elastic#70320) (e…
Browse files Browse the repository at this point in the history
…lastic#70419)

Before this commit, if an aggregator didn't have anything to do in
`AggregatorBase#getLeafCollector` it was obligated to throw
`CollectionTerminatedException` if there wasn't a parent aggregator,
otherwise it was obligated to return `LeafBucketCollector.NOOP`. This
seems like something aggregators shouldn't have to do. So this commit
changes `getLeafCollector` so aggregators are obligated to return
`LeafBucketCollector.NOOP` if they have no work to do. The aggregation
framework will throw the exception if its appropriate. Otherwise it'll
use the `NOOP` collector. If they have work to do the
`LeafBucketCollector`s that they do return may still throw
`CollectionTerminatedException` to signal that they are done with the
leaf.
  • Loading branch information
nik9000 authored Mar 15, 2021
1 parent 90a245d commit 0a31d51
Show file tree
Hide file tree
Showing 20 changed files with 161 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.search;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -578,7 +577,7 @@ public void close() {}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void preProcess(SearchContext context) {
}
context.aggregations().aggregators(aggregators);
if (collectors.isEmpty() == false) {
Collector collector = MultiBucketCollector.wrap(collectors);
Collector collector = MultiBucketCollector.wrap(true, collectors);
((BucketCollector)collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
Expand Down Expand Up @@ -80,7 +80,7 @@ public void execute(SearchContext context) {

// optimize the global collector based execution
if (globals.isEmpty() == false) {
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals);
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public Map<String, Object> metadata() {
/**
* Get a {@link LeafBucketCollector} for the given ctx, which should
* delegate to the given collector.
* <p>
* {@linkplain Aggregator}s that perform collection independent of the main
* search should collect the provided leaf in their implementation of this
* method and return {@link LeafBucketCollector#NO_OP_COLLECTOR} to signal
* that they don't need to be collected with the main search. We'll remove
* them from the list of collectors.
*/
protected abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;

Expand All @@ -181,8 +187,7 @@ protected void doPreCollection() throws IOException {

@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
doPreCollection();
collectableSubAggregators.preCollection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,26 @@
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {

/** See {@link #wrap(Iterable)}. */
public static BucketCollector wrap(BucketCollector... collectors) {
return wrap(Arrays.asList(collectors));
}

/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector, it is returned.
* <li>If the input contains 1 real collector we wrap it in a collector that takes
* {@code terminateIfNoop} into account.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* </ul>
* @param terminateIfNoop Pass true if {@link #getLeafCollector} should throw
* {@link CollectionTerminatedException} if all leaf collectors are noop. Pass
* false if terminating would break stuff. The top level collection for
* aggregations should pass true here because we want to skip collections if
* all aggregations return NOOP. But when aggregtors themselves call this
* method they chould *generally* pass false here because they have collection
* actions to perform even if their sub-aggregators are NOOPs.
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
public static BucketCollector wrap(boolean terminateIfNoop, Iterable<? extends BucketCollector> collectors) {
// For the user's convenience, we allow NO_OP collectors to be passed.
// However, to improve performance, these null collectors are found
// and dropped from the array we save for actual collection time.
Expand All @@ -68,7 +70,43 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
break;
}
}
return col;
final BucketCollector collector = col;
// Wrap the collector in one that takes terminateIfNoop into account.
return new BucketCollector() {
@Override
public ScoreMode scoreMode() {
return collector.scoreMode();
}

@Override
public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
collector.postCollection();
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
try {
LeafBucketCollector leafCollector = collector.getLeafCollector(ctx);
if (false == leafCollector.isNoop()) {
return leafCollector;
}
} catch (CollectionTerminatedException e) {
throw new IllegalStateException(
"getLeafCollector should return a noop collector instead of throw "
+ CollectionTerminatedException.class.getSimpleName(), e
);
}
if (terminateIfNoop) {
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
};
} else {
BucketCollector[] colls = new BucketCollector[n];
n = 0;
Expand All @@ -77,14 +115,16 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
colls[n++] = c;
}
}
return new MultiBucketCollector(colls);
return new MultiBucketCollector(terminateIfNoop, colls);
}
}

private final boolean terminateIfNoop;
private final boolean cacheScores;
private final BucketCollector[] collectors;

private MultiBucketCollector(BucketCollector... collectors) {
private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collectors) {
this.terminateIfNoop = terminateIfNoop;
this.collectors = collectors;
int numNeedsScores = 0;
for (Collector collector : collectors) {
Expand Down Expand Up @@ -129,26 +169,27 @@ public String toString() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
final List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
for (BucketCollector collector : collectors) {
final LeafBucketCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(context);
LeafBucketCollector leafCollector = collector.getLeafCollector(context);
if (false == leafCollector.isNoop()) {
leafCollectors.add(leafCollector);
}
} catch (CollectionTerminatedException e) {
// this leaf collector does not need this segment
continue;
throw new IllegalStateException(
"getLeafCollector should return a noop collector instead of throw "
+ CollectionTerminatedException.class.getSimpleName(),
e
);
}
leafCollectors.add(leafCollector);
}
switch (leafCollectors.size()) {
case 0:
// TODO it's probably safer to return noop and let the caller throw if it wants to
/*
* See MinAggregator which only throws if it has a parent.
* That is because it doesn't want there to ever drop
* to this case and throw, thus skipping calculating the parent.
*/
throw new CollectionTerminatedException();
if (terminateIfNoop) {
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
case 1:
return leafCollectors.get(0);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public ScoreMode scoreMode() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = MultiBucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(true, deferredCollectors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doPreCollection() throws IOException {
deferringCollector.setDeferredCollector(deferredAggregations);
collectors.add(deferringCollector);
}
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(false, collectors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ protected void doClose() {

@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = MultiBucketCollector.wrap(collectors);
deferredCollectors = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

Expand Down Expand Up @@ -388,7 +387,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// Throwing this exception will terminate the execution of the search for this root aggregation,
// see {@link MultiCollector} for more details on how we handle early termination in aggregations.
earlyTerminated = true;
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
if (fillDocIdSet) {
currentLeaf = ctx;
Expand All @@ -399,7 +398,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
return new LeafBucketCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.search.aggregations.bucket.filter;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -394,8 +393,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
} else {
collectSubs(ctx, live, sub);
}
// Throwing this exception is how we communicate to the collection mechanism that we don't need the segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ScoreMode scoreMode() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.deferred = MultiBucketCollector.wrap(deferredCollectors);
this.deferred = MultiBucketCollector.wrap(true, deferredCollectors);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FutureArrays;
Expand Down Expand Up @@ -71,12 +70,7 @@ public ScoreMode scoreMode() {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent != null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
if (pointConverter != null) {
Number segMax = findLeafMaxValue(ctx.reader(), pointField, pointConverter);
Expand All @@ -90,7 +84,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
max = Math.max(max, segMax.doubleValue());
maxes.set(0, max);
// the maximum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}
}
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ public ScoreMode scoreMode() {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
if (pointConverter != null) {
Number segMin = findLeafMinValue(ctx.reader(), pointField, pointConverter);
Expand All @@ -90,13 +85,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
min = Math.min(min, segMin.doubleValue());
mins.set(0, min);
// the minimum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}
}
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= mins.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public void setScorer(Scorable scorer) throws IOException {
delegate.setScorer(scorer);
}

@Override
public boolean isNoop() {
return delegate.isNoop();
}

}
Loading

0 comments on commit 0a31d51

Please sign in to comment.