Skip to content

Commit

Permalink
Remove Collector implementation from BucketCollector (#88444)
Browse files Browse the repository at this point in the history
BucketCollector has now a method called #asCollector that returns the current BucketCollector wrapped as a 
Lucene Collector.
  • Loading branch information
iverase authored Jul 18, 2022
1 parent 2064595 commit 04bdefd
Show file tree
Hide file tree
Showing 26 changed files with 115 additions and 85 deletions.
2 changes: 1 addition & 1 deletion docs/reference/search/profile.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ The API returns the following result:
]
},
{
"name": "MultiBucketCollector: [[my_scoped_agg, my_global_agg]]",
"name": "BucketCollectorWrapper: [BucketCollectorWrapper[bucketCollector=[my_scoped_agg, my_global_agg]]]",
"reason": "aggregation",
"time_in_nanos": 867617
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public static void preProcess(SearchContext context) {
context.queryCollectors().put(AggregationPhase.class, BucketCollector.NO_OP_COLLECTOR);
} else {
Collector collector = context.getProfilers() == null
? bucketCollector
: new InternalProfileCollector(bucketCollector, CollectorResult.REASON_AGGREGATION, List.of());
? bucketCollector.asCollector()
: new InternalProfileCollector(bucketCollector.asCollector(), CollectorResult.REASON_AGGREGATION, List.of());
context.queryCollectors().put(AggregationPhase.class, collector);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.ScoreMode;

import java.io.IOException;

/**
* A Collector that can collect data in separate buckets.
*/
public abstract class BucketCollector implements Collector {
public abstract class BucketCollector {

public static final BucketCollector NO_OP_COLLECTOR = new BucketCollector() {
public static final BucketCollector NO_OP_BUCKET_COLLECTOR = new BucketCollector() {

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) {
Expand All @@ -41,11 +42,18 @@ public ScoreMode scoreMode() {
}
};

// TODO: will remove it in a follow up PR
@Override
public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {
return getLeafCollector(new AggregationExecutionContext(ctx, null, null));
}
public static final Collector NO_OP_COLLECTOR = new Collector() {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}

@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}
};

public abstract LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException;

Expand All @@ -59,4 +67,28 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws
*/
public abstract void postCollection() throws IOException;

/**
* Indicates what features are required from the scorer.
*/
public abstract ScoreMode scoreMode();

/**
* Return this BucketCollector wrapped as a {@link Collector}
*/
public final Collector asCollector() {
return new BucketCollectorWrapper(this);
}

private record BucketCollectorWrapper(BucketCollector bucketCollector) implements Collector {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null));
}

@Override
public ScoreMode scoreMode() {
return bucketCollector.scoreMode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
/**
* A {@link BucketCollector} which allows running a bucket collection with several
* {@link BucketCollector}s. It is similar to the {@link MultiCollector} except that the
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_COLLECTOR}s and not
* {@link #wrap} method filters out the {@link BucketCollector#NO_OP_BUCKET_COLLECTOR}s and not
* the null ones.
*/
public class MultiBucketCollector extends BucketCollector {
/**
* Wraps a list of {@link BucketCollector}s with a {@link MultiBucketCollector}. This
* method works as follows:
* <ul>
* <li>Filters out the {@link BucketCollector#NO_OP_COLLECTOR}s collectors, so they are not used
* <li>Filters out the {@link BucketCollector#NO_OP_BUCKET_COLLECTOR}s collectors, so they are not used
* during search time.
* <li>If the input contains 1 real collector we wrap it in a collector that takes
* {@code terminateIfNoop} into account.
* <li>Otherwise the method returns a {@link MultiBucketCollector} which wraps the
* non-{@link BucketCollector#NO_OP_COLLECTOR} collectors.
* non-{@link BucketCollector#NO_OP_BUCKET_COLLECTOR} collectors.
* </ul>
* @param terminateIfNoop Pass true if {@link #getLeafCollector} should throw
* {@link CollectionTerminatedException} if all leaf collectors are noop. Pass
Expand All @@ -52,13 +52,13 @@ public static BucketCollector wrap(boolean terminateIfNoop, Iterable<? extends B
// and dropped from the array we save for actual collection time.
int n = 0;
for (BucketCollector c : collectors) {
if (c != NO_OP_COLLECTOR) {
if (c != NO_OP_BUCKET_COLLECTOR) {
n++;
}
}

if (n == 0) {
return NO_OP_COLLECTOR;
return NO_OP_BUCKET_COLLECTOR;
} else if (n == 1) {
// only 1 Collector - return it.
BucketCollector col = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void doClose() {
@Override
protected void doPreCollection() throws IOException {
deferredCollectors = MultiBucketCollector.wrap(false, Arrays.asList(subAggregators));
collectableSubAggregators = BucketCollector.NO_OP_COLLECTOR;
collectableSubAggregators = BucketCollector.NO_OP_BUCKET_COLLECTOR;
}

@Override
Expand All @@ -170,7 +170,7 @@ protected void doPostCollection() throws IOException {
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
if (deferredCollectors != NO_OP_COLLECTOR) {
if (deferredCollectors != NO_OP_BUCKET_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
}
Expand Down Expand Up @@ -440,7 +440,7 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, LeafBucketCollector sub) throws IOException {
finishLeaf();

boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
boolean fillDocIdSet = deferredCollectors != NO_OP_BUCKET_COLLECTOR;

Sort indexSortPrefix = buildIndexSortPrefix(aggCtx.getLeafReaderContext());
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void testCollectionTerminatedExceptionHandling() throws IOException {
expectedCounts.put(collector, expectedCount);
collectors.add(new TerminateAfterBucketCollector(collector, terminateAfter));
}
searcher.search(new MatchAllDocsQuery(), MultiBucketCollector.wrap(true, collectors));
searcher.search(new MatchAllDocsQuery(), MultiBucketCollector.wrap(true, collectors).asCollector());
for (Map.Entry<TotalHitCountBucketCollector, Integer> expectedCount : expectedCounts.entrySet()) {
assertEquals(expectedCount.getValue().intValue(), expectedCount.getKey().getTotalHits());
}
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testSetScorerAfterCollectionTerminated() throws IOException {
Collections.shuffle(collectors, random());
BucketCollector collector = MultiBucketCollector.wrap(true, collectors);

LeafBucketCollector leafCollector = collector.getLeafCollector((LeafReaderContext) null);
LeafBucketCollector leafCollector = collector.getLeafCollector(null);
leafCollector.setScorer(scorer);
assertTrue(setScorerCalled1.get());
assertTrue(setScorerCalled2.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
Expand Down Expand Up @@ -72,7 +74,7 @@ public ScoreMode scoreMode() {
Set<Integer> deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(bla(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(termQuery, collector);
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand All @@ -86,7 +88,7 @@ public ScoreMode scoreMode() {
deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(bla(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), collector);
indexSearcher.search(new MatchAllDocsQuery(), collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand Down Expand Up @@ -199,21 +201,17 @@ private void testCase(
CollectingBucketCollector finalCollector = new CollectingBucketCollector();
deferringCollector.setDeferredCollector(Collections.singleton(finalCollector));
deferringCollector.preCollection();
indexSearcher.search(query, new BucketCollector() {
indexSearcher.search(query, new Collector() {
@Override
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE_NO_SCORES;
}

@Override
public void preCollection() throws IOException {}

@Override
public void postCollection() throws IOException {}

@Override
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx) throws IOException {
LeafBucketCollector delegate = deferringCollector.getLeafCollector(aggCtx);
public LeafBucketCollector getLeafCollector(LeafReaderContext context) throws IOException {
LeafBucketCollector delegate = deferringCollector.getLeafCollector(
new AggregationExecutionContext(context, null, null)
);
return leafCollector.apply(deferringCollector, delegate);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery());
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down Expand Up @@ -746,7 +746,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery(), ft);
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down Expand Up @@ -812,7 +812,7 @@ public void onCache(ShardId shardId, Accountable accountable) {}
AggregationContext context = createAggregationContext(searcher, new MatchAllDocsQuery(), ft);
FilterByFilterAggregator aggregator = createAggregator(builder, context);
aggregator.preCollection();
searcher.search(context.query(), aggregator);
searcher.search(context.query(), aggregator.asCollector());
aggregator.postCollection();

InternalAggregation result = aggregator.buildTopLevel();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ private void aggregationImplementationChoiceTestCase(
}
assertThat(agg, matcher);
agg.preCollection();
context.searcher().search(context.query(), agg);
context.searcher().search(context.query(), agg.asCollector());
InternalDateHistogram result = (InternalDateHistogram) agg.buildTopLevel();
result = (InternalDateHistogram) result.reduce(
List.of(result),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testReplay() throws Exception {
Set<Integer> deferredCollectedDocIds = new HashSet<>();
collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds)));
collector.preCollection();
indexSearcher.search(termQuery, collector);
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ public void testUnmapped() throws Exception {
RareTermsAggregationBuilder aggregationBuilder = new RareTermsAggregationBuilder("_name").field(fieldNames[i]);
Aggregator aggregator = createAggregator(aggregationBuilder, indexSearcher, fieldType1, fieldType2);
aggregator.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), aggregator);
indexSearcher.search(new MatchAllDocsQuery(), aggregator.asCollector());
aggregator.postCollection();
RareTerms result = (RareTerms) aggregator.buildTopLevel();
assertEquals("_name", result.getName());
Expand Down
Loading

0 comments on commit 04bdefd

Please sign in to comment.