Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Aggs use NOOP leaf collector #70320

Merged
merged 5 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.action.search;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
Expand Down Expand Up @@ -575,7 +574,7 @@ public void close() {}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void preProcess(SearchContext context) {
}
context.aggregations().aggregators(aggregators);
if (collectors.isEmpty() == false) {
Collector collector = MultiBucketCollector.wrap(collectors);
Collector collector = MultiBucketCollector.wrap(true, collectors);
((BucketCollector)collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(collector, CollectorResult.REASON_AGGREGATION,
Expand Down Expand Up @@ -80,7 +80,7 @@ public void execute(SearchContext context) {

// optimize the global collector based execution
if (globals.isEmpty() == false) {
BucketCollector globalsCollector = MultiBucketCollector.wrap(globals);
BucketCollector globalsCollector = MultiBucketCollector.wrap(false, globals);
Query query = context.buildFilteredQuery(Queries.newMatchAllQuery());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public Map<String, Object> metadata() {
/**
* Get a {@link LeafBucketCollector} for the given ctx, which should
* delegate to the given collector.
* <p>
* {@linkplain Aggregator}s that perform collection independent of the main
* search should collect the provided leaf in their implementation of this
* method and return {@link LeafBucketCollector#NO_OP_COLLECTOR} to signal
* that they don't need to be collected with the main search. We'll remove
* them from the list of collectors.
*/
protected abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;

Expand All @@ -181,8 +187,7 @@ protected void doPreCollection() throws IOException {

@Override
public final void preCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
doPreCollection();
collectableSubAggregators.preCollection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,26 @@
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {

/** See {@link #wrap(Iterable)}. */
public static BucketCollector wrap(BucketCollector... collectors) {
return wrap(Arrays.asList(collectors));
}

/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector, it is returned.
* <li>If the input contains 1 real collector we wrap it in a collector that takes
* {@code terminateIfNoop} into account.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* </ul>
* @param terminateIfNoop Pass true if {@link #getLeafCollector} should throw
* {@link CollectionTerminatedException} if all leaf collectors are noop. Pass
* false if terminating would break stuff. The top level collection for
* aggregations should pass true here because we want to skip collections if
* all aggregations return NOOP. But when aggregtors themselves call this
* method they chould *generally* pass false here because they have collection
* actions to perform even if their sub-aggregators are NOOPs.
*/
public static BucketCollector wrap(Iterable<? extends BucketCollector> collectors) {
public static BucketCollector wrap(boolean terminateIfNoop, Iterable<? extends BucketCollector> collectors) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've piggy backed on this method we use all over the place to add the terminate behavior. I'm wondering if we'd be better off making an entirely new wrapper just for the terminateIfNoop wrapper and only using it on the top level to get the terminations. I think it'd save the inline class below. But it'd drop the IllegalStateException that you get from throwing the exception when you shouldn't.

// For the user's convenience, we allow NO_OP collectors to be passed.
// However, to improve performance, these null collectors are found
// and dropped from the array we save for actual collection time.
Expand All @@ -68,7 +70,43 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
break;
}
}
return col;
final BucketCollector collector = col;
// Wrap the collector in one that takes terminateIfNoop into account.
return new BucketCollector() {
@Override
public ScoreMode scoreMode() {
return collector.scoreMode();
}

@Override
public void preCollection() throws IOException {
collector.preCollection();
}

@Override
public void postCollection() throws IOException {
collector.postCollection();
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
try {
LeafBucketCollector leafCollector = collector.getLeafCollector(ctx);
if (false == leafCollector.isNoop()) {
return leafCollector;
}
} catch (CollectionTerminatedException e) {
throw new IllegalStateException(
"getLeafCollector should return a noop collector instead of throw "
+ CollectionTerminatedException.class.getSimpleName(), e
);
}
if (terminateIfNoop) {
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
};
} else {
BucketCollector[] colls = new BucketCollector[n];
n = 0;
Expand All @@ -77,14 +115,16 @@ public static BucketCollector wrap(Iterable<? extends BucketCollector> collector
colls[n++] = c;
}
}
return new MultiBucketCollector(colls);
return new MultiBucketCollector(terminateIfNoop, colls);
}
}

private final boolean terminateIfNoop;
private final boolean cacheScores;
private final BucketCollector[] collectors;

private MultiBucketCollector(BucketCollector... collectors) {
private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collectors) {
this.terminateIfNoop = terminateIfNoop;
this.collectors = collectors;
int numNeedsScores = 0;
for (Collector collector : collectors) {
Expand Down Expand Up @@ -129,26 +169,27 @@ public String toString() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
final List<LeafBucketCollector> leafCollectors = new ArrayList<>();
final List<LeafBucketCollector> leafCollectors = new ArrayList<>(collectors.length);
for (BucketCollector collector : collectors) {
final LeafBucketCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(context);
LeafBucketCollector leafCollector = collector.getLeafCollector(context);
if (false == leafCollector.isNoop()) {
leafCollectors.add(leafCollector);
}
} catch (CollectionTerminatedException e) {
// this leaf collector does not need this segment
continue;
throw new IllegalStateException(
"getLeafCollector should return a noop collector instead of throw "
+ CollectionTerminatedException.class.getSimpleName(),
e
);
}
leafCollectors.add(leafCollector);
}
switch (leafCollectors.size()) {
case 0:
// TODO it's probably safer to return noop and let the caller throw if it wants to
/*
* See MinAggregator which only throws if it has a parent.
* That is because it doesn't want there to ever drop
* to this case and throw, thus skipping calculating the parent.
*/
throw new CollectionTerminatedException();
if (terminateIfNoop) {
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
case 1:
return leafCollectors.get(0);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public ScoreMode scoreMode() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.collector = MultiBucketCollector.wrap(deferredCollectors);
this.collector = MultiBucketCollector.wrap(true, deferredCollectors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected void doPreCollection() throws IOException {
deferringCollector.setDeferredCollector(deferredAggregations);
collectors.add(deferringCollector);
}
collectableSubAggregators = MultiBucketCollector.wrap(collectors);
collectableSubAggregators = MultiBucketCollector.wrap(false, collectors);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ protected void doClose() {

@Override
protected void doPreCollection() throws IOException {
List<BucketCollector> collectors = Arrays.asList(subAggregators);
deferredCollectors = MultiBucketCollector.wrap(collectors);
deferredCollectors = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
}

Expand Down Expand Up @@ -388,7 +387,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// Throwing this exception will terminate the execution of the search for this root aggregation,
// see {@link MultiCollector} for more details on how we handle early termination in aggregations.
earlyTerminated = true;
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
if (fillDocIdSet) {
currentLeaf = ctx;
Expand All @@ -399,7 +398,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
// that is after the index sort prefix using the rawAfterKey and we start collecting
// document from there.
processLeafFromQuery(ctx, indexSortPrefix);
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder, sortPrefixLen));
return new LeafBucketCollector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.search.aggregations.bucket.filter;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -394,8 +393,7 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
} else {
collectSubs(ctx, live, sub);
}
// Throwing this exception is how we communicate to the collection mechanism that we don't need the segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ScoreMode scoreMode() {
/** Set the deferred collectors. */
@Override
public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
this.deferred = MultiBucketCollector.wrap(deferredCollectors);
this.deferred = MultiBucketCollector.wrap(true, deferredCollectors);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FutureArrays;
Expand Down Expand Up @@ -71,12 +70,7 @@ public ScoreMode scoreMode() {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent != null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
if (pointConverter != null) {
Number segMax = findLeafMaxValue(ctx.reader(), pointField, pointConverter);
Expand All @@ -90,7 +84,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
max = Math.max(max, segMax.doubleValue());
maxes.set(0, max);
// the maximum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}
}
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ public ScoreMode scoreMode() {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
final LeafBucketCollector sub) throws IOException {
if (valuesSource == null) {
if (parent == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
// we have no parent and the values source is empty so we can skip collecting hits.
throw new CollectionTerminatedException();
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
if (pointConverter != null) {
Number segMin = findLeafMinValue(ctx.reader(), pointField, pointConverter);
Expand All @@ -90,13 +85,12 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
min = Math.min(min, segMin.doubleValue());
mins.set(0, min);
// the minimum value has been extracted, we don't need to collect hits on this segment.
throw new CollectionTerminatedException();
return LeafBucketCollector.NO_OP_COLLECTOR;
}
}
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {

@Override
public void collect(int doc, long bucket) throws IOException {
if (bucket >= mins.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public void setScorer(Scorable scorer) throws IOException {
delegate.setScorer(scorer);
}

@Override
public boolean isNoop() {
return delegate.isNoop();
}

}
Loading