Skip to content

Commit

Permalink
Move bucket reduction from Bucket to the InternalAgg (elastic#45566)
Browse files Browse the repository at this point in the history
The current idiom is to have the InternalAggregator find all the
buckets sharing the same key, put them in a list, get the first bucket
and ask that bucket to reduce all the buckets (including itself).

This a somewhat confusing workflow, and feels like the aggregator should
be reducing the buckets (since the aggregator owns the buckets), rather
than asking one bucket to do all the reductions.

This commit basically moves the `Bucket.reduce()` method to the
InternalAgg and renames it `reduceBucket()`.  It also moves the
`createBucket()` (or equivalent) method from the bucket to the
InternalAgg as well.
  • Loading branch information
polyfractal committed Aug 16, 2019
1 parent 2ecd6a8 commit 50c65d0
Show file tree
Hide file tree
Showing 32 changed files with 297 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public ScriptService scriptService() {
public void consumeBucketsAndMaybeBreak(int size) {
multiBucketConsumer.accept(size);
}

}

protected final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
*/
public abstract B createBucket(InternalAggregations aggregations, B prototype);

/**
* Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This
* requires all buckets to have the same key.
*/
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);

@Override
public abstract List<? extends InternalBucket> getBuckets();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,6 @@ public Aggregations getAggregations() {
return aggregations;
}

InternalBucket reduce(List<InternalBucket> buckets, ReduceContext context) {
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
return reduced;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -212,7 +197,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu

ArrayList<InternalBucket> reducedBuckets = new ArrayList<>(bucketsMap.size());
for (List<InternalBucket> sameRangeList : bucketsMap.values()) {
InternalBucket reducedBucket = sameRangeList.get(0).reduce(sameRangeList, reduceContext);
InternalBucket reducedBucket = reduceBucket(sameRangeList, reduceContext);
if(reducedBucket.docCount >= 1){
reduceContext.consumeBucketsAndMaybeBreak(1);
reducedBuckets.add(reducedBucket);
Expand All @@ -228,6 +213,23 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
return reduced;
}

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
assert buckets.size() > 0;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
return reduced;
}

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
while (pq.size() > 0) {
BucketIterator bucketIt = pq.poll();
if (lastBucket != null && bucketIt.current.compareKey(lastBucket) != 0) {
InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
buckets.clear();
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
Expand All @@ -186,14 +186,27 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
}
}
if (buckets.size() > 0) {
InternalBucket reduceBucket = buckets.get(0).reduce(buckets, reduceContext);
InternalBucket reduceBucket = reduceBucket(buckets, reduceContext);
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(reduceBucket);
}
final CompositeKey lastKey = result.size() > 0 ? result.get(result.size()-1).getRawKey() : null;
return new InternalComposite(name, size, sourceNames, formats, result, lastKey, reverseMuls, pipelineAggregators(), metaData);
}

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
for (InternalBucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return new InternalBucket(sourceNames, formats, buckets.get(0).key, reverseMuls, docCount, aggs);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
Expand Down Expand Up @@ -321,17 +334,6 @@ public Aggregations getAggregations() {
return aggregations;
}

InternalBucket reduce(List<InternalBucket> buckets, ReduceContext reduceContext) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
for (InternalBucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, reduceContext);
return new InternalBucket(sourceNames, formats, key, reverseMuls, docCount, aggs);
}

@Override
public int compareKey(InternalBucket other) {
for (int i = 0; i < key.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,6 @@ public Aggregations getAggregations() {
return aggregations;
}

InternalBucket reduce(List<InternalBucket> buckets, ReduceContext context) {
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
return reduced;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
if (keyed) {
Expand Down Expand Up @@ -227,8 +212,25 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
InternalFilters reduced = new InternalFilters(name, new ArrayList<>(bucketsList.size()), keyed, pipelineAggregators(),
getMetaData());
for (List<InternalBucket> sameRangeList : bucketsList) {
reduced.buckets.add((sameRangeList.get(0)).reduce(sameRangeList, reduceContext));
reduced.buckets.add(reduceBucket(sameRangeList, reduceContext));
}
return reduced;
}

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
assert buckets.size() > 0;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
return reduced;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
Expand Down Expand Up @@ -96,48 +95,6 @@ public void collect(int doc, long bucket) throws IOException {
};
}

// private impl that stores a bucket ord. This allows for computing the aggregations lazily.
static class OrdinalBucket extends InternalGeoGridBucket {

long bucketOrd;
InternalGeoGridBucket sourceBucket; // used to keep track of appropriate getKeyAsString method

OrdinalBucket(InternalGeoGridBucket sourceBucket) {
super(sourceBucket.hashAsLong, sourceBucket.docCount, sourceBucket.aggregations);
this.sourceBucket = sourceBucket;
}

void hashAsLong(long hashAsLong) {
this.hashAsLong = hashAsLong;
this.sourceBucket.hashAsLong = hashAsLong;
}

@Override
InternalGeoGridBucket buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount,
InternalAggregations aggregations) {
OrdinalBucket ordBucket = new OrdinalBucket(bucket);
ordBucket.hashAsLong = hashAsLong;
ordBucket.docCount = docCount;
ordBucket.aggregations = aggregations;
// this is done because the aggregator may be rebuilt from cache (non OrdinalBucket),
// or it may be rebuilding from a new calculation, and therefore copying bucketOrd.
if (bucket instanceof OrdinalBucket) {
ordBucket.bucketOrd = ((OrdinalBucket) bucket).bucketOrd;
}
return ordBucket;
}

@Override
public Object getKey() {
return sourceBucket.getKey();
}

@Override
public String getKeyAsString() {
return sourceBucket.getKeyAsString();
}
}

abstract T buildAggregation(String name, int requiredSize, List<InternalGeoGridBucket> buckets,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData);

Expand All @@ -154,24 +111,24 @@ public InternalGeoGrid buildAggregation(long owningBucketOrdinal) throws IOExcep
final int size = (int) Math.min(bucketOrds.size(), shardSize);
consumeBucketsAndMaybeBreak(size);

BucketPriorityQueue ordered = new BucketPriorityQueue(size);
OrdinalBucket spare = null;
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
InternalGeoGridBucket spare = null;
for (long i = 0; i < bucketOrds.size(); i++) {
if (spare == null) {
spare = new OrdinalBucket(newEmptyBucket());
spare = newEmptyBucket();
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong(bucketOrds.get(i));
spare.hashAsLong = bucketOrds.get(i);
spare.docCount = bucketDocCount(i);
spare.bucketOrd = i;
spare = (OrdinalBucket) ordered.insertWithOverflow(spare);
spare = ordered.insertWithOverflow(spare);
}

final InternalGeoGridBucket[] list = new InternalGeoGridBucket[ordered.size()];
for (int i = ordered.size() - 1; i >= 0; --i) {
final OrdinalBucket bucket = (OrdinalBucket) ordered.pop();
final InternalGeoGridBucket bucket = ordered.pop();
bucket.aggregations = bucketAggregations(bucket.bucketOrd);
list[i] = bucket;
}
Expand All @@ -183,10 +140,8 @@ public InternalGeoGrid buildEmptyAggregation() {
return buildAggregation(name, requiredSize, Collections.emptyList(), pipelineAggregators(), metaData());
}


@Override
public void doClose() {
Releasables.close(bucketOrds);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

Expand Down Expand Up @@ -81,15 +82,15 @@ public List<InternalGeoGridBucket> getBuckets() {

@Override
public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
LongObjectPagedHashMap<List<B>> buckets = null;
LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
InternalGeoGrid grid = (InternalGeoGrid) aggregation;
if (buckets == null) {
buckets = new LongObjectPagedHashMap<>(grid.buckets.size(), reduceContext.bigArrays());
}
for (Object obj : grid.buckets) {
B bucket = (B) obj;
List<B> existingBuckets = buckets.get(bucket.hashAsLong());
InternalGeoGridBucket bucket = (InternalGeoGridBucket) obj;
List<InternalGeoGridBucket> existingBuckets = buckets.get(bucket.hashAsLong());
if (existingBuckets == null) {
existingBuckets = new ArrayList<>(aggregations.size());
buckets.put(bucket.hashAsLong(), existingBuckets);
Expand All @@ -100,9 +101,9 @@ public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceCo

final int size = Math.toIntExact(reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size()));
BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size);
for (LongObjectPagedHashMap.Cursor<List<B>> cursor : buckets) {
List<B> sameCellBuckets = cursor.value;
InternalGeoGridBucket removed = ordered.insertWithOverflow(sameCellBuckets.get(0).reduce(sameCellBuckets, reduceContext));
for (LongObjectPagedHashMap.Cursor<List<InternalGeoGridBucket>> cursor : buckets) {
List<InternalGeoGridBucket> sameCellBuckets = cursor.value;
InternalGeoGridBucket removed = ordered.insertWithOverflow(reduceBucket(sameCellBuckets, reduceContext));
if (removed != null) {
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(removed));
} else {
Expand All @@ -117,6 +118,21 @@ public InternalGeoGrid doReduce(List<InternalAggregation> aggregations, ReduceCo
return create(getName(), requiredSize, Arrays.asList(list), pipelineAggregators(), getMetaData());
}

@Override
protected InternalGeoGridBucket reduceBucket(List<InternalGeoGridBucket> buckets, ReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
long docCount = 0;
for (InternalGeoGridBucket bucket : buckets) {
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
return createBucket(buckets.get(0).hashAsLong, docCount, aggs);
}

abstract B createBucket(long hashAsLong, long docCount, InternalAggregations aggregations);

@Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.startArray(CommonFields.BUCKETS.getPreferredName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public abstract class InternalGeoGridBucket<B extends InternalGeoGridBucket>
Expand All @@ -39,6 +36,8 @@ public abstract class InternalGeoGridBucket<B extends InternalGeoGridBucket>
protected long docCount;
protected InternalAggregations aggregations;

long bucketOrd;

public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregations aggregations) {
this.docCount = docCount;
this.aggregations = aggregations;
Expand All @@ -61,9 +60,6 @@ public void writeTo(StreamOutput out) throws IOException {
aggregations.writeTo(out);
}

abstract B buildBucket(InternalGeoGridBucket bucket, long hashAsLong, long docCount, InternalAggregations aggregations);


long hashAsLong() {
return hashAsLong;
}
Expand All @@ -89,17 +85,6 @@ public int compareTo(InternalGeoGridBucket other) {
return 0;
}

public B reduce(List<B> buckets, InternalAggregation.ReduceContext context) {
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
long docCount = 0;
for (B bucket : buckets) {
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
return buildBucket(this, hashAsLong, docCount, aggs);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Loading

0 comments on commit 50c65d0

Please sign in to comment.