diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 665dd49e3381d..e86c7127ec2f4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -160,7 +160,8 @@ protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {} * the provided ordinals. *

* Most aggregations should probably use something like - * {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)} + * {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)} + * or {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)} * or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)} * or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)} * or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)} @@ -193,10 +194,9 @@ public int size() { } /** - * Build the sub aggregation results for a list of buckets and set them on - * the buckets. This is usually used by aggregations that are selective - * in which bucket they build. They use some mechanism of selecting a list - * of buckets to build use this method to "finish" building the results. + * Similarly to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)} + * but it needs to build the bucket ordinals. This method usually requires for buckets + * to contain the bucket ordinal. * @param buckets the buckets to finish building * @param bucketToOrd how to convert a bucket into an ordinal * @param setAggs how to set the sub-aggregation results on a bucket @@ -218,12 +218,29 @@ protected final void buildSubAggsForAllBuckets( bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket)); } } - var results = buildSubAggsForBuckets(bucketOrdsToCollect); - s = 0; - for (long ord = 0; ord < buckets.size(); ord++) { - for (B value : buckets.get(ord)) { - setAggs.accept(value, results.apply(s++)); - } + buildSubAggsForAllBuckets(buckets, bucketOrdsToCollect, setAggs); + } + } + + /** + * Build the sub aggregation results for a list of buckets and set them on + * the buckets. This is usually used by aggregations that are selective + * in which bucket they build. They use some mechanism of selecting a list + * of buckets to build use this method to "finish" building the results. + * @param buckets the buckets to finish building + * @param bucketOrdsToCollect bucket ordinals + * @param setAggs how to set the sub-aggregation results on a bucket + */ + protected final void buildSubAggsForAllBuckets( + ObjectArray buckets, + LongArray bucketOrdsToCollect, + BiConsumer setAggs + ) throws IOException { + var results = buildSubAggsForBuckets(bucketOrdsToCollect); + int s = 0; + for (long ord = 0; ord < buckets.size(); ord++) { + for (B value : buckets.get(ord)) { + setAggs.accept(value, results.apply(s++)); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java index cc677605c4528..85c79df42a714 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/BucketPriorityQueue.java @@ -11,17 +11,24 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; -class BucketPriorityQueue extends ObjectArrayPriorityQueue { +import java.util.function.Function; - BucketPriorityQueue(int size, BigArrays bigArrays) { +class BucketPriorityQueue extends ObjectArrayPriorityQueue { + + private final Function bucketSupplier; + + BucketPriorityQueue(int size, BigArrays bigArrays, Function bucketSupplier) { super(size, bigArrays); + this.bucketSupplier = bucketSupplier; } @Override - protected boolean lessThan(InternalGeoGridBucket o1, InternalGeoGridBucket o2) { - int cmp = Long.compare(o2.getDocCount(), o1.getDocCount()); + protected boolean lessThan(A o1, A o2) { + final B b1 = bucketSupplier.apply(o1); + final B b2 = bucketSupplier.apply(o2); + int cmp = Long.compare(b2.getDocCount(), b1.getDocCount()); if (cmp == 0) { - cmp = o2.compareTo(o1); + cmp = b2.compareTo(b1); if (cmp == 0) { cmp = System.identityHashCode(o2) - System.identityHashCode(o1); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index 1d3614af08768..b84dff6e73e0b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; @@ -23,6 +24,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.BucketsAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.ValuesSource; @@ -135,34 +137,52 @@ public void collect(int doc, long owningBucketOrd) throws IOException { @Override public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try (ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize); - - try (BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, bigArrays())) { - InternalGeoGridBucket spare = null; - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); - while (ordsEnum.next()) { - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = newEmptyBucket(); + try (IntArray bucketsSizePerOrd = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize); + ordsToCollect += size; + bucketsSizePerOrd.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + try ( + BucketPriorityQueue, InternalGeoGridBucket> ordered = + new BucketPriorityQueue<>(bucketsSizePerOrd.get(ordIdx), bigArrays(), b -> b.bucket) + ) { + BucketAndOrd spare = null; + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + while (ordsEnum.next()) { + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(newEmptyBucket()); + } + + // need a special function to keep the source bucket + // up-to-date so it can get the appropriate key + spare.bucket.hashAsLong = ordsEnum.value(); + spare.bucket.docCount = bucketDocCount(ordsEnum.ord()); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + final int orderedSize = (int) ordered.size(); + final InternalGeoGridBucket[] buckets = new InternalGeoGridBucket[orderedSize]; + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketBucketAndOrd = ordered.pop(); + buckets[i] = bucketBucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketBucketAndOrd.ord); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - - // need a special function to keep the source bucket - // up-to-date so it can get the appropriate key - spare.hashAsLong = ordsEnum.value(); - spare.docCount = bucketDocCount(ordsEnum.ord()); - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd.set(ordIdx, new InternalGeoGridBucket[(int) ordered.size()]); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); } + assert ordsCollected == ordsArray.size(); + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } } - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); return buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata()) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index 6a32b41034503..343c92b353884 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import static java.util.Collections.unmodifiableList; @@ -106,7 +107,13 @@ public InternalAggregation get() { final int size = Math.toIntExact( context.isFinalReduce() == false ? bucketsReducer.size() : Math.min(requiredSize, bucketsReducer.size()) ); - try (BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, context.bigArrays())) { + try ( + BucketPriorityQueue ordered = new BucketPriorityQueue<>( + size, + context.bigArrays(), + Function.identity() + ) + ) { bucketsReducer.forEach(entry -> { InternalGeoGridBucket bucket = createBucket(entry.key, entry.value.getDocCount(), entry.value.getAggregations()); ordered.insertWithOverflow(bucket); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java index 60de4c3974c92..8884a412bcf41 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGridBucket.java @@ -28,8 +28,6 @@ public abstract class InternalGeoGridBucket extends InternalMultiBucketAggregati protected long docCount; protected InternalAggregations aggregations; - long bucketOrd; - public InternalGeoGridBucket(long hashAsLong, long docCount, InternalAggregations aggregations) { this.docCount = docCount; this.aggregations = aggregations; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketAndOrd.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketAndOrd.java new file mode 100644 index 0000000000000..7b853860b7959 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketAndOrd.java @@ -0,0 +1,21 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.aggregations.bucket.terms; + +/** Represents a bucket and its bucket ordinal */ +public final class BucketAndOrd { + + public final B bucket; // the bucket + public long ord; // mutable ordinal of the bucket + + public BucketAndOrd(B bucket) { + this.bucket = bucket; + } +}