Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usages of BucketCollector#getLeafCollector(LeafReaderContext) #88414

Merged
merged 9 commits into from
Jul 11, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ protected void prepareSubAggs(long[] ordsToCollect) throws IOException {
continue;
}
DocIdSetIterator childDocsIter = childDocsScorer.iterator();

final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(ctx);
final LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(new AggregationExecutionContext(ctx, null, null));

final SortedSetDocValues globalOrdinals = valuesSource.globalOrdinalsValues(ctx);
// Set the scorer, since we now replay only the child docIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.search.aggregations;

import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MultiCollector;
import org.apache.lucene.search.Scorable;
Expand Down Expand Up @@ -127,7 +126,7 @@ private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collect
this.terminateIfNoop = terminateIfNoop;
this.collectors = collectors;
int numNeedsScores = 0;
for (Collector collector : collectors) {
for (BucketCollector collector : collectors) {
if (collector.scoreMode().needsScores()) {
numNeedsScores += 1;
}
Expand All @@ -138,7 +137,7 @@ private MultiBucketCollector(boolean terminateIfNoop, BucketCollector... collect
@Override
public ScoreMode scoreMode() {
ScoreMode scoreMode = null;
for (Collector collector : collectors) {
for (BucketCollector collector : collectors) {
if (scoreMode == null) {
scoreMode = collector.scoreMode();
} else if (scoreMode != collector.scoreMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -42,12 +41,12 @@
*/
public class BestBucketsDeferringCollector extends DeferringBucketCollector {
static class Entry {
final LeafReaderContext context;
final AggregationExecutionContext aggCtx;
final PackedLongValues docDeltas;
final PackedLongValues buckets;

Entry(LeafReaderContext context, PackedLongValues docDeltas, PackedLongValues buckets) {
this.context = Objects.requireNonNull(context);
Entry(AggregationExecutionContext aggCtx, PackedLongValues docDeltas, PackedLongValues buckets) {
this.aggCtx = Objects.requireNonNull(aggCtx);
this.docDeltas = Objects.requireNonNull(docDeltas);
this.buckets = Objects.requireNonNull(buckets);
}
Expand All @@ -59,7 +58,7 @@ static class Entry {

private List<Entry> entries = new ArrayList<>();
private BucketCollector collector;
private LeafReaderContext context;
private AggregationExecutionContext aggCtx;
private PackedLongValues.Builder docDeltasBuilder;
private PackedLongValues.Builder bucketsBuilder;
private LongHash selectedBuckets;
Expand Down Expand Up @@ -93,10 +92,10 @@ public void setDeferredCollector(Iterable<BucketCollector> deferredCollectors) {
* Button up the builders for the current leaf.
*/
private void finishLeaf() {
if (context != null) {
if (aggCtx != null) {
assert docDeltasBuilder != null && bucketsBuilder != null;
assert docDeltasBuilder.size() > 0;
entries.add(new Entry(context, docDeltasBuilder.build(), bucketsBuilder.build()));
entries.add(new Entry(aggCtx, docDeltasBuilder.build(), bucketsBuilder.build()));
clearLeaf();
}
}
Expand All @@ -105,7 +104,7 @@ private void finishLeaf() {
* Clear the status for the current leaf.
*/
private void clearLeaf() {
context = null;
aggCtx = null;
docDeltasBuilder = null;
bucketsBuilder = null;
}
Expand All @@ -119,8 +118,8 @@ public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx)

@Override
public void collect(int doc, long bucket) throws IOException {
if (context == null) {
context = aggCtx.getLeafReaderContext();
if (BestBucketsDeferringCollector.this.aggCtx == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying this on slack. I think renaming would be clearer here.

BestBucketsDeferringCollector.this.aggCtx = aggCtx;
docDeltasBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
bucketsBuilder = PackedLongValues.packedBuilder(PackedInts.DEFAULT);
}
Expand Down Expand Up @@ -169,10 +168,10 @@ public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
for (Entry entry : entries) {
assert entry.docDeltas.size() > 0 : "segment should have at least one document to replay, got 0";
try {
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.context);
final LeafBucketCollector leafCollector = collector.getLeafCollector(entry.aggCtx);
DocIdSetIterator scoreIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Scorer scorer = weight.scorer(entry.aggCtx.getLeafReaderContext());
// We don't need to check if the scorer is null
// since we are sure that there are documents to replay (entry.docDeltas it not empty).
scoreIt = scorer.iterator();
Expand Down Expand Up @@ -266,7 +265,7 @@ public void rewriteBuckets(LongUnaryOperator howToRewrite) {
// Only create an entry if this segment has buckets after merging
if (newBuckets.size() > 0) {
assert newDocDeltas.size() > 0 : "docDeltas was empty but we had buckets";
newEntries.add(new Entry(sourceEntry.context, newDocDeltas.build(), newBuckets.build()));
newEntries.add(new Entry(sourceEntry.aggCtx, newDocDeltas.build(), newBuckets.build()));
}
}
entries = newEntries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ public abstract class BucketsAggregator extends AggregatorBase {
public BucketsAggregator(
String name,
AggregatorFactories factories,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound bucketCardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, bucketCardinality, metadata);
multiBucketConsumer = context.multiBucketConsumer();
super(name, factories, aggCtx, parent, bucketCardinality, metadata);
multiBucketConsumer = aggCtx.multiBucketConsumer();
docCounts = bigArrays().newLongArray(1, true);
docCountProvider = new DocCountProvider();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
private final DateHistogramValuesSource[] innerSizedBucketAggregators;

private final List<Entry> entries = new ArrayList<>();
private LeafReaderContext currentLeaf;
private AggregationExecutionContext currentAggCtx;
private RoaringDocIdSet.Builder docIdSetBuilder;
private BucketCollector deferredCollectors;

Expand All @@ -87,22 +87,22 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
CompositeAggregator(
String name,
AggregatorFactories factories,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
Map<String, Object> metadata,
int size,
CompositeValuesSourceConfig[] sourceConfigs,
CompositeKey rawAfterKey
) throws IOException {
super(name, factories, context, parent, CardinalityUpperBound.MANY, metadata);
super(name, factories, aggCtx, parent, CardinalityUpperBound.MANY, metadata);
this.size = size;
this.sourceNames = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::name).toList();
this.reverseMuls = Arrays.stream(sourceConfigs).mapToInt(CompositeValuesSourceConfig::reverseMul).toArray();
this.missingOrders = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::missingOrder).toArray(MissingOrder[]::new);
this.formats = Arrays.stream(sourceConfigs).map(CompositeValuesSourceConfig::format).toList();
this.sources = new SingleDimensionValuesSource<?>[sourceConfigs.length];
// check that the provided size is not greater than the search.max_buckets setting
int bucketLimit = context.multiBucketConsumer().getLimit();
int bucketLimit = aggCtx.multiBucketConsumer().getLimit();
if (size > bucketLimit) {
throw new MultiBucketConsumerService.TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal"
Expand All @@ -120,8 +120,8 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
List<DateHistogramValuesSource> dateHistogramValuesSources = new ArrayList<>();
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = sourceConfigs[i].createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
aggCtx.bigArrays(),
aggCtx.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
Expand All @@ -130,7 +130,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
}
}
this.innerSizedBucketAggregators = dateHistogramValuesSources.toArray(new DateHistogramValuesSource[0]);
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size);
this.queue = new CompositeValuesCollectorQueue(aggCtx.bigArrays(), sources, size);
if (rawAfterKey != null) {
try {
this.queue.setAfterKey(rawAfterKey);
Expand Down Expand Up @@ -230,10 +230,10 @@ public InternalAggregation buildEmptyAggregation() {
}

private void finishLeaf() {
if (currentLeaf != null) {
if (currentAggCtx != null) {
DocIdSet docIdSet = docIdSetBuilder.build();
entries.add(new Entry(currentLeaf, docIdSet));
currentLeaf = null;
entries.add(new Entry(currentAggCtx, docIdSet));
currentAggCtx = null;
docIdSetBuilder = null;
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
// in the queue.
DocIdSet docIdSet = sortedDocsProducer.processLeaf(topLevelQuery(), queue, aggCtx.getLeafReaderContext(), fillDocIdSet);
if (fillDocIdSet) {
entries.add(new Entry(aggCtx.getLeafReaderContext(), docIdSet));
entries.add(new Entry(aggCtx, docIdSet));
}
// We can bypass search entirely for this segment, the processing is done in the previous call.
// Throwing this exception will terminate the execution of the search for this root aggregation,
Expand All @@ -463,7 +463,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
return LeafBucketCollector.NO_OP_COLLECTOR;
} else {
if (fillDocIdSet) {
currentLeaf = aggCtx.getLeafReaderContext();
currentAggCtx = aggCtx;
docIdSetBuilder = new RoaringDocIdSet.Builder(aggCtx.getLeafReaderContext().reader().maxDoc());
}
if (rawAfterKey != null && sortPrefixLen > 0) {
Expand Down Expand Up @@ -538,11 +538,14 @@ private void runDeferredCollections() throws IOException {
if (docIdSetIterator == null) {
continue;
}
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context);
final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));
final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.aggCtx);
final LeafBucketCollector collector = queue.getLeafCollector(
entry.aggCtx.getLeafReaderContext(),
getSecondPassCollector(subCollector)
);
DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Scorer scorer = weight.scorer(entry.aggCtx.getLeafReaderContext());
if (scorer != null) {
scorerIt = scorer.iterator();
subCollector.setScorer(scorer);
Expand Down Expand Up @@ -605,11 +608,11 @@ public double bucketSize(Rounding.DateTimeUnit unit) {
}

private static class Entry {
final LeafReaderContext context;
final AggregationExecutionContext aggCtx;
final DocIdSet docIdSet;

Entry(LeafReaderContext context, DocIdSet docIdSet) {
this.context = context;
Entry(AggregationExecutionContext aggCtx, DocIdSet docIdSet) {
this.aggCtx = aggCtx;
this.docIdSet = docIdSet;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract static class AdapterBuilder<T> {
private final String name;
private final List<QueryToFilterAdapter> filters = new ArrayList<>();
private final boolean keyed;
private final AggregationContext context;
private final AggregationContext aggCtx;
private final Aggregator parent;
private final CardinalityUpperBound cardinality;
private final Map<String, Object> metadata;
Expand All @@ -58,18 +58,18 @@ public AdapterBuilder(
String name,
boolean keyed,
String otherBucketKey,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
this.name = name;
this.keyed = keyed;
this.context = context;
this.aggCtx = aggCtx;
this.parent = parent;
this.cardinality = cardinality;
this.metadata = metadata;
this.rewrittenTopLevelQuery = context.searcher().rewrite(context.query());
this.rewrittenTopLevelQuery = aggCtx.searcher().rewrite(aggCtx.query());
this.valid = parent == null && otherBucketKey == null;
}

Expand All @@ -93,7 +93,7 @@ public final void add(String key, Query query) throws IOException {
valid = false;
return;
}
add(QueryToFilterAdapter.build(context.searcher(), key, query));
add(QueryToFilterAdapter.build(aggCtx.searcher(), key, query));
}

final void add(QueryToFilterAdapter filter) throws IOException {
Expand All @@ -120,7 +120,7 @@ final void add(QueryToFilterAdapter filter) throws IOException {
* fields are expensive to decode and the overhead of iterating per
* filter causes us to decode doc counts over and over again.
*/
if (context.hasDocCountField()) {
if (aggCtx.hasDocCountField()) {
valid = false;
return;
}
Expand All @@ -140,7 +140,7 @@ class AdapterBuild implements CheckedFunction<AggregatorFactories, FilterByFilte

@Override
public FilterByFilterAggregator apply(AggregatorFactories subAggregators) throws IOException {
agg = new FilterByFilterAggregator(name, subAggregators, filters, keyed, context, parent, cardinality, metadata);
agg = new FilterByFilterAggregator(name, subAggregators, filters, keyed, aggCtx, parent, cardinality, metadata);
return agg;
}
}
Expand Down Expand Up @@ -202,12 +202,12 @@ private FilterByFilterAggregator(
AggregatorFactories factories,
List<QueryToFilterAdapter> filters,
boolean keyed,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, filters, keyed, null, context, parent, cardinality, metadata);
super(name, factories, filters, keyed, null, aggCtx, parent, cardinality, metadata);
}

/**
Expand Down Expand Up @@ -237,7 +237,7 @@ protected LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCt
collectCount(aggCtx.getLeafReaderContext(), live);
} else {
segmentsCollected++;
collectSubs(aggCtx.getLeafReaderContext(), live, sub);
collectSubs(aggCtx, live, sub);
}
return LeafBucketCollector.NO_OP_COLLECTOR;
}
Expand Down Expand Up @@ -273,7 +273,7 @@ private void collectCount(LeafReaderContext ctx, Bits live) throws IOException {
* less memory because there isn't a need to buffer a block of matches.
* And its a hell of a lot less code.
*/
private void collectSubs(LeafReaderContext ctx, Bits live, LeafBucketCollector sub) throws IOException {
private void collectSubs(AggregationExecutionContext aggCtx, Bits live, LeafBucketCollector sub) throws IOException {
class MatchCollector implements LeafCollector {
LeafBucketCollector subCollector = sub;
int filterOrd;
Expand All @@ -287,11 +287,11 @@ public void collect(int docId) throws IOException {
public void setScorer(Scorable scorer) throws IOException {}
}
MatchCollector collector = new MatchCollector();
filters().get(0).collect(ctx, collector, live);
filters().get(0).collect(aggCtx.getLeafReaderContext(), collector, live);
for (int filterOrd = 1; filterOrd < filters().size(); filterOrd++) {
collector.subCollector = collectableSubAggregators.getLeafCollector(ctx);
collector.subCollector = collectableSubAggregators.getLeafCollector(aggCtx);
collector.filterOrd = filterOrd;
filters().get(filterOrd).collect(ctx, collector, live);
filters().get(filterOrd).collect(aggCtx.getLeafReaderContext(), collector, live);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,12 @@ protected FilterByFilterAggregator adapt(
List<QueryToFilterAdapter> filters,
boolean keyed,
String otherBucketKey,
AggregationContext context,
AggregationContext aggCtx,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
super(name, factories, context, parent, cardinality.multiply(filters.size() + (otherBucketKey == null ? 0 : 1)), metadata);
super(name, factories, aggCtx, parent, cardinality.multiply(filters.size() + (otherBucketKey == null ? 0 : 1)), metadata);
this.filters = List.copyOf(filters);
this.keyed = keyed;
this.otherBucketKey = otherBucketKey;
Expand Down
Loading