Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Collector implementation from BucketCollector #88444

Merged
merged 5 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ The API returns the following result:
]
},
{
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"name": "BucketCollectorWrapper: [BucketCollectorWrapper[bucketCollector=[my_scoped_agg, my_global_agg]]]",
"reason": "aggregation",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm, this might be the most problematic part of this change?

"time_in_nanos": 867617
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public static void preProcess(SearchContext context) {
context.queryCollectors().put(AggregationPhase.class, BucketCollector.NO_OP_COLLECTOR);
} else {
Collector collector = context.getProfilers() == null
? bucketCollector
: new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, List.of());
? bucketCollector.asCollector()
: new InternalProfileCollector(bucketCollector.asCollector(), CollectorResult.REASON_AGGREGATION, List.of());
context.queryCollectors().put(AggregationPhase.class, collector);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;

import java.io.IOException;

/**
* A Collector that can collect data in separate buckets.
*/
public abstract class BucketCollector implements Collector {
public abstract class BucketCollector {

public static final BucketCollector NO_OP_COLLECTOR = new BucketCollector() {
public static final BucketCollector NO_OP_BUCKET_COLLECTOR = new BucketCollector() {

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) {
Expand All @@ -41,11 +42,18 @@ public ScoreMode scoreMode() {
}
};

// TODO: will remove it in a follow up PR
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return getLeafCollector(new AggregationExecutionContext(ctx, null, null));
}
public static final Collector NO_OP_COLLECTOR = new Collector() {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
};

public abstract LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException;

Expand All @@ -59,4 +67,28 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws
*/
public abstract void postCollection() throws IOException;

/**
* Indicates what features are required from the scorer.
*/
public abstract ScoreMode scoreMode();

/**
* Return this BucketCollector wrapped as a {@link Collector}
*/
public final Collector asCollector() {
return new BucketCollectorWrapper(this);
}

private record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null));
}

@Override
public ScoreMode scoreMode() {
return bucketCollector.scoreMode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
/**
* A {@link BucketCollector} which allows running a bucket collection with several
* {@link BucketCollector}s. It is similar to the {@link MultiCollector} except that the
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_COLLECTOR}s and not
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_BUCKET_COLLECTOR}s and not
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {
/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* <li>Filters out the {@link BucketCollector#NO_OP_BUCKET_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector we wrap it in a collector that takes
* {@code terminateIfNoop} into account.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* non-{@link BucketCollector#NO_OP_BUCKET_COLLECTOR} collectors.
* </ul>
* @param terminateIfNoop Pass true if {@link #getLeafCollector} should throw
* {@link CollectionTerminatedException} if all leaf collectors are noop. Pass
Expand All @@ -52,13 +52,13 @@ public static BucketCollector wrap(boolean terminateIfNoop, Iterable<? extends B
// and dropped from the array we save for actual collection time.
int n = 0;
for (BucketCollector c : collectors) {
if (c != NO_OP_COLLECTOR) {
if (c != NO_OP_BUCKET_COLLECTOR) {
n++;
}
}

if (n == 0) {
return NO_OP_COLLECTOR;
return NO_OP_BUCKET_COLLECTOR;
} else if (n == 1) {
// only 1 Collector - return it.
BucketCollector col = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void doClose() {
@Override
protected void doPreCollection() throws IOException {
deferredCollectors = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
collectableSubAggregators = BucketCollector.NO_OP_BUCKET_COLLECTOR;
}

@Override
Expand All @@ -170,7 +170,7 @@ protected void doPostCollection() throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
if (deferredCollectors != NO_OP_COLLECTOR) {
if (deferredCollectors != NO_OP_BUCKET_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
}
Expand Down Expand Up @@ -440,7 +440,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
boolean fillDocIdSet = deferredCollectors != NO_OP_BUCKET_COLLECTOR;

Sort indexSortPrefix = buildIndexSortPrefix(aggCtx.getLeafReaderContext());
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testCollectionTerminatedExceptionHandling() throws IOException {
expectedCounts.put(collector, expectedCount);
collectors.add(new TerminateAfterBucketCollector(collector, terminateAfter));
}
searcher.search(new MatchAllDocsQuery(), MultiBucketCollector.wrap(true, collectors));
searcher.search(new MatchAllDocsQuery(), MultiBucketCollector.wrap(true, collectors).asCollector());
for (Map.Entry<TotalHitCountBucketCollector, Integer> expectedCount : expectedCounts.entrySet()) {
assertEquals(expectedCount.getValue().intValue(), expectedCount.getKey().getTotalHits());
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testSetScorerAfterCollectionTerminated() throws IOException {
Collections.shuffle(collectors, random());
BucketCollector collector = MultiBucketCollector.wrap(true, collectors);

LeafBucketCollector leafCollector = collector.getLeafCollector((LeafReaderContext) null);
LeafBucketCollector leafCollector = collector.getLeafCollector(null);
leafCollector.setScorer(scorer);
assertTrue(setScorerCalled1.get());
assertTrue(setScorerCalled2.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -72,7 +74,7 @@ public ScoreMode scoreMode() {
Set<Integer> deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(bla(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(termQuery, collector);
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand All @@ -86,7 +88,7 @@ public ScoreMode scoreMode() {
deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(bla(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), collector);
indexSearcher.search(new MatchAllDocsQuery(), collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand Down Expand Up @@ -199,21 +201,17 @@ private void testCase(
CollectingBucketCollector finalCollector = new CollectingBucketCollector();
deferringCollector.setDeferredCollector(Collections.singleton(finalCollector));
deferringCollector.preCollection();
indexSearcher.search(query, new BucketCollector() {
indexSearcher.search(query, new Collector() {
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public void preCollection() throws IOException {}

@Override
public void postCollection() throws IOException {}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
LeafBucketCollector delegate = deferringCollector.getLeafCollector(aggCtx);
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
LeafBucketCollector delegate = deferringCollector.getLeafCollector(
new AggregationExecutionContext(context, null, null)
);
return leafCollector.apply(deferringCollector, delegate);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery());
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down Expand Up @@ -746,7 +746,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery(), ft);
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down Expand Up @@ -812,7 +812,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery(), ft);
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ private void aggregationImplementationChoiceTestCase(
}
assertThat(agg, matcher);
agg.preCollection();
context.searcher().search(context.query(), agg);
context.searcher().search(context.query(), agg.asCollector());
InternalDateHistogram result = (InternalDateHistogram) agg.buildTopLevel();
result = (InternalDateHistogram) result.reduce(
List.of(result),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testReplay() throws Exception {
Set<Integer> deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(termQuery, collector);
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testUnmapped() throws Exception {
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name").field(fieldNames[i]);
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType1, fieldType2);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
indexSearcher.search(new MatchAllDocsQuery(), aggregator.asCollector());
aggregator.postCollection();
RareTerms result = (RareTerms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
Expand Down
Loading