Skip to content

Commit

Permalink
Remove bucketOrd from InternalGeoGridBucket (elastic#117615)
Browse files Browse the repository at this point in the history
This commit removes the need of having a bucketOrd in InternalGeoGridBucket that is only used to build the 
InternalAggregation from the aggregator.
  • Loading branch information
iverase committed Dec 2, 2024
1 parent 4b7c3b6 commit ee7b1d1
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {}
* the provided ordinals.
* <p>
* Most aggregations should probably use something like
* {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
* {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}
* or {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
* or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
* or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)}
* or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)}
Expand Down Expand Up @@ -193,10 +194,9 @@ public int size() {
}

/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* Similarly to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}
* but it needs to build the bucket ordinals. This method usually requires for buckets
* to contain the bucket ordinal.
* @param buckets the buckets to finish building
* @param bucketToOrd how to convert a bucket into an ordinal
* @param setAggs how to set the sub-aggregation results on a bucket
Expand All @@ -218,12 +218,29 @@ protected final <B> void buildSubAggsForAllBuckets(
bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket));
}
}
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
s = 0;
for (long ord = 0; ord < buckets.size(); ord++) {
for (B value : buckets.get(ord)) {
setAggs.accept(value, results.apply(s++));
}
buildSubAggsForAllBuckets(buckets, bucketOrdsToCollect, setAggs);
}
}

/**
* Build the sub aggregation results for a list of buckets and set them on
* the buckets. This is usually used by aggregations that are selective
* in which bucket they build. They use some mechanism of selecting a list
* of buckets to build use this method to "finish" building the results.
* @param buckets the buckets to finish building
* @param bucketOrdsToCollect bucket ordinals
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForAllBuckets(
ObjectArray<B[]> buckets,
LongArray bucketOrdsToCollect,
BiConsumer<B, InternalAggregations> setAggs
) throws IOException {
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
int s = 0;
for (long ord = 0; ord < buckets.size(); ord++) {
for (B value : buckets.get(ord)) {
setAggs.accept(value, results.apply(s++));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;

class BucketPriorityQueue<B extends InternalGeoGridBucket> extends ObjectArrayPriorityQueue<B> {
import java.util.function.Function;

BucketPriorityQueue(int size, BigArrays bigArrays) {
class BucketPriorityQueue<A, B extends InternalGeoGridBucket> extends ObjectArrayPriorityQueue<A> {

private final Function<A, B> bucketSupplier;

BucketPriorityQueue(int size, BigArrays bigArrays, Function<A, B> bucketSupplier) {
super(size, bigArrays);
this.bucketSupplier = bucketSupplier;
}

@Override
protected boolean lessThan(InternalGeoGridBucket o1, InternalGeoGridBucket o2) {
int cmp = Long.compare(o2.getDocCount(), o1.getDocCount());
protected boolean lessThan(A o1, A o2) {
final B b1 = bucketSupplier.apply(o1);
final B b2 = bucketSupplier.apply(o2);
int cmp = Long.compare(b2.getDocCount(), b1.getDocCount());
if (cmp == 0) {
cmp = o2.compareTo(o1);
cmp = b2.compareTo(b1);
if (cmp == 0) {
cmp = System.identityHashCode(o2) - System.identityHashCode(o1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -135,34 +137,52 @@ public void collect(int doc, long owningBucketOrd) throws IOException {

@Override
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {

try (ObjectArray<InternalGeoGridBucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize);

try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, bigArrays())) {
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = newEmptyBucket();
try (IntArray bucketsSizePerOrd = bigArrays().newIntArray(owningBucketOrds.size())) {
long ordsToCollect = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize);
ordsToCollect += size;
bucketsSizePerOrd.set(ordIdx, size);
}
try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {
long ordsCollected = 0;
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
try (
BucketPriorityQueue<BucketAndOrd<InternalGeoGridBucket>, InternalGeoGridBucket> ordered =
new BucketPriorityQueue<>(bucketsSizePerOrd.get(ordIdx), bigArrays(), b -> b.bucket)
) {
BucketAndOrd<InternalGeoGridBucket> spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = new BucketAndOrd<>(newEmptyBucket());
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.bucket.hashAsLong = ordsEnum.value();
spare.bucket.docCount = bucketDocCount(ordsEnum.ord());
spare.ord = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
final int orderedSize = (int) ordered.size();
final InternalGeoGridBucket[] buckets = new InternalGeoGridBucket[orderedSize];
for (int i = orderedSize - 1; i >= 0; --i) {
BucketAndOrd<InternalGeoGridBucket> bucketBucketAndOrd = ordered.pop();
buckets[i] = bucketBucketAndOrd.bucket;
ordsArray.set(ordsCollected + i, bucketBucketAndOrd.ord);
}
topBucketsPerOrd.set(ordIdx, buckets);
ordsCollected += orderedSize;
}

// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd.set(ordIdx, new InternalGeoGridBucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
}
assert ordsCollected == ordsArray.size();
buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs);
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
return buildAggregations(
Math.toIntExact(owningBucketOrds.size()),
ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static java.util.Collections.unmodifiableList;

Expand Down Expand Up @@ -106,7 +107,13 @@ public InternalAggregation get() {
final int size = Math.toIntExact(
context.isFinalReduce() == false ? bucketsReducer.size() : Math.min(requiredSize, bucketsReducer.size())
);
try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, context.bigArrays())) {
try (
BucketPriorityQueue<InternalGeoGridBucket, InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(
size,
context.bigArrays(),
Function.identity()
)
) {
bucketsReducer.forEach(entry -> {
InternalGeoGridBucket bucket = createBucket(entry.key, entry.value.getDocCount(), entry.value.getAggregations());
ordered.insertWithOverflow(bucket);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ public abstract class InternalGeoGridBucket extends InternalMultiBucketAggregati
protected long docCount;
protected InternalAggregations aggregations;

long bucketOrd;

public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregations aggregations) {
this.docCount = docCount;
this.aggregations = aggregations;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.search.aggregations.bucket.terms;

/** Represents a bucket and its bucket ordinal */
public final class BucketAndOrd<B> {

public final B bucket; // the bucket
public long ord; // mutable ordinal of the bucket

public BucketAndOrd(B bucket) {
this.bucket = bucket;
}
}

0 comments on commit ee7b1d1

Please sign in to comment.