diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java index 2d360705f75b6..c412ecb5d6361 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.xcontent.ToXContentObject; @@ -20,13 +21,12 @@ import java.util.Comparator; import java.util.List; import java.util.function.BiFunction; -import java.util.function.ToLongFunction; /** * {@link Bucket} ordering strategy. Buckets can be order either as * "complete" buckets using {@link #comparator()} or against a combination * of the buckets internals with its ordinal with - * {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}. + * {@link #partiallyBuiltBucketComparator(Aggregator)}. */ public abstract class BucketOrder implements ToXContentObject, Writeable { /** @@ -102,7 +102,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc * to validate this order because doing so checks all of the appropriate * paths. */ - partiallyBuiltBucketComparator(null, aggregator); + partiallyBuiltBucketComparator(aggregator); } /** @@ -121,7 +121,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc * with it all the time. *

*/ - public abstract Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator); + public abstract Comparator> partiallyBuiltBucketComparator(Aggregator aggregator); /** * Build a comparator for fully built buckets. diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java index 043fab6f4f122..74534c275d111 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalOrder.java @@ -16,6 +16,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.search.aggregations.Aggregator.BucketComparator; import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortValue; @@ -31,7 +32,6 @@ import java.util.List; import java.util.Objects; import java.util.function.BiFunction; -import java.util.function.ToLongFunction; /** * Implementations for {@link Bucket} ordering strategies. @@ -64,10 +64,10 @@ public AggregationPath path() { } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { try { BucketComparator bucketComparator = path.bucketComparator(aggregator, order); - return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs)); + return (lhs, rhs) -> bucketComparator.compare(lhs.ord, rhs.ord); } catch (IllegalArgumentException e) { throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e); } @@ -189,12 +189,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { - List> comparators = orderElements.stream() - .map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator)) - .toList(); + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { + List>> comparators = new ArrayList<>(orderElements.size()); + for (BucketOrder order : orderElements) { + comparators.add(order.partiallyBuiltBucketComparator(aggregator)); + } return (lhs, rhs) -> { - for (Comparator c : comparators) { + for (Comparator> c : comparators) { int result = c.compare(lhs, rhs); if (result != 0) { return result; @@ -300,9 +301,9 @@ byte id() { } @Override - public Comparator partiallyBuiltBucketComparator(ToLongFunction ordinalReader, Aggregator aggregator) { + public Comparator> partiallyBuiltBucketComparator(Aggregator aggregator) { Comparator comparator = comparator(); - return comparator::compare; + return (lhs, rhs) -> comparator.compare(lhs.bucket, rhs.bucket); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java index 192b0b3d7323e..310fcd4fb6110 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; @@ -26,6 +27,7 @@ import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms; @@ -38,7 +40,6 @@ import java.util.Arrays; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Supplier; import static java.util.Collections.emptyList; import static org.apache.lucene.index.SortedSetDocValues.NO_MORE_ORDS; @@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size()); ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - // as users can't control sort order, in practice we'll always sort by doc count descending - try ( - BucketPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - StringTerms.Bucket spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); - Supplier emptyBucketBuilder = () -> new StringTerms.Bucket( - new BytesRef(), - 0, - null, - false, - 0, - format - ); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = emptyBucketBuilder.get(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + // find how many buckets we are going to collect + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize()); + bucketsToCollect.set(ordIdx, size); + ordsToCollect += size; + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + // as users can't control sort order, in practice we'll always sort by doc count descending + try ( + BucketPriorityQueue ordered = new BucketPriorityQueue<>( + bucketsToCollect.get(ordIdx), + bigArrays(), + order.partiallyBuiltBucketComparator(this) + ) + ) { + BucketAndOrd spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format)); + } + ordsEnum.readValue(spare.bucket.getTermBytes()); + spare.bucket.setDocCount(docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + final int orderedSize = (int) ordered.size(); + final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize]; + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes())); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - ordsEnum.readValue(spare.getTermBytes()); - spare.setDocCount(docCount); - spare.setBucketOrd(ordsEnum.ord()); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); - topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes())); } + assert ordsCollected == ordsArray.size(); + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations); } } - buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); - return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { final BucketOrder reduceOrder; if (isKeyOrder(order) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java index 7f8e5c8c885fa..9550003a5bd1e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketPriorityQueue.java @@ -13,17 +13,17 @@ import java.util.Comparator; -public class BucketPriorityQueue extends ObjectArrayPriorityQueue { +public class BucketPriorityQueue extends ObjectArrayPriorityQueue> { - private final Comparator comparator; + private final Comparator> comparator; - public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator comparator) { + public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator> comparator) { super(size, bigArrays); this.comparator = comparator; } @Override - protected boolean lessThan(B a, B b) { + protected boolean lessThan(BucketAndOrd a, BucketAndOrd b) { return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java index fe751c9e79189..4736f52d93622 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BucketSignificancePriorityQueue.java @@ -12,14 +12,14 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; -public class BucketSignificancePriorityQueue extends ObjectArrayPriorityQueue { +public class BucketSignificancePriorityQueue extends ObjectArrayPriorityQueue> { public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) { super(size, bigArrays); } @Override - protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) { - return o1.getSignificanceScore() < o2.getSignificanceScore(); + protected boolean lessThan(BucketAndOrd o1, BucketAndOrd o2) { + return o1.bucket.getSignificanceScore() < o2.bucket.getSignificanceScore(); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 037870016a5f3..ee472bb2050a1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -20,6 +20,7 @@ import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.common.util.ObjectArray; @@ -558,10 +559,10 @@ InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOExc ) { GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; final int size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { + try (ObjectArrayPriorityQueue> ordered = collectionStrategy.buildPriorityQueue(size)) { BucketUpdater updater = collectionStrategy.bucketUpdater(0, lookupGlobalOrd); collect(new BucketInfoConsumer() { - TB spare = null; + BucketAndOrd spare = null; @Override public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { @@ -569,24 +570,31 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep if (docCount >= bucketCountThresholds.getShardMinDocCount()) { if (spare == null) { checkRealMemoryCBForInternalBucket(); - spare = collectionStrategy.buildEmptyTemporaryBucket(); + spare = new BucketAndOrd<>(collectionStrategy.buildEmptyTemporaryBucket()); } - updater.updateBucket(spare, globalOrd, bucketOrd, docCount); + spare.ord = bucketOrd; + updater.updateBucket(spare.bucket, globalOrd, docCount); spare = ordered.insertWithOverflow(spare); } } }); // Get the top buckets - topBucketsPreOrd.set(0, collectionStrategy.buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - checkRealMemoryCBForInternalBucket(); - B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); - topBucketsPreOrd.get(0)[i] = bucket; - otherDocCount.increment(0, -bucket.getDocCount()); + int orderedSize = (int) ordered.size(); + try (LongArray ordsArray = bigArrays().newLongArray(orderedSize)) { + B[] buckets = collectionStrategy.buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + checkRealMemoryCBForInternalBucket(); + BucketAndOrd bucketAndOrd = ordered.pop(); + B bucket = collectionStrategy.convertTempBucketToRealBucket(bucketAndOrd.bucket, lookupGlobalOrd); + ordsArray.set(i, bucketAndOrd.ord); + buckets[i] = bucket; + otherDocCount.increment(0, -bucket.getDocCount()); + } + topBucketsPreOrd.set(0, buckets); + collectionStrategy.buildSubAggs(topBucketsPreOrd, ordsArray); } } - collectionStrategy.buildSubAggs(topBucketsPreOrd); return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> collectionStrategy.buildResult( @@ -706,39 +714,61 @@ InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOExc LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPreOrd = collectionStrategy.buildTopBucketsPerOrd(owningBucketOrds.size()) ) { - GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; - for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { - long owningBucketOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningBucketOrds.get(ordIdx)); - int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = collectionStrategy.buildPriorityQueue(size)) { - BucketUpdater updater = collectionStrategy.bucketUpdater(owningBucketOrd, lookupGlobalOrd); - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - TB spare = null; - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCount.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = collectionStrategy.buildEmptyTemporaryBucket(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd); + final int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; + for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { + long owningBucketOrd = owningBucketOrds.get(ordIdx); + try ( + ObjectArrayPriorityQueue> ordered = collectionStrategy.buildPriorityQueue( + bucketsToCollect.get(ordIdx) + ) + ) { + BucketUpdater updater = collectionStrategy.bucketUpdater(owningBucketOrd, lookupGlobalOrd); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + BucketAndOrd spare = null; + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCount.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(collectionStrategy.buildEmptyTemporaryBucket()); + } + updater.updateBucket(spare.bucket, ordsEnum.value(), docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + // Get the top buckets + int orderedSize = (int) ordered.size(); + B[] buckets = collectionStrategy.buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + checkRealMemoryCBForInternalBucket(); + BucketAndOrd bucketAndOrd = ordered.pop(); + B bucket = collectionStrategy.convertTempBucketToRealBucket(bucketAndOrd.bucket, lookupGlobalOrd); + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + buckets[i] = bucket; + otherDocCount.increment(ordIdx, -bucket.getDocCount()); + } + topBucketsPreOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - updater.updateBucket(spare, ordsEnum.value(), ordsEnum.ord(), docCount); - spare = ordered.insertWithOverflow(spare); - } - // Get the top buckets - topBucketsPreOrd.set(ordIdx, collectionStrategy.buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - checkRealMemoryCBForInternalBucket(); - B bucket = collectionStrategy.convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); - topBucketsPreOrd.get(ordIdx)[i] = bucket; - otherDocCount.increment(ordIdx, -bucket.getDocCount()); } + assert ordsCollected == ordsArray.size(); + collectionStrategy.buildSubAggs(topBucketsPreOrd, ordsArray); } } - collectionStrategy.buildSubAggs(topBucketsPreOrd); return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> collectionStrategy.buildResult( @@ -787,7 +817,7 @@ abstract class ResultStrategy< * Build a {@link PriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Build an array to hold the "top" buckets for each ordinal. @@ -809,7 +839,7 @@ abstract class ResultStrategy< * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException; /** * Turn the buckets into an aggregation result. @@ -830,7 +860,7 @@ abstract class ResultStrategy< } interface BucketUpdater { - void updateBucket(TB spare, long globalOrd, long bucketOrd, long docCount) throws IOException; + void updateBucket(TB spare, long globalOrd, long docCount) throws IOException; } /** @@ -864,29 +894,30 @@ OrdBucket buildEmptyTemporaryBucket() { @Override BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) { - return (spare, globalOrd, bucketOrd, docCount) -> { + return (spare, globalOrd, docCount) -> { spare.globalOrd = globalOrd; - spare.bucketOrd = bucketOrd; spare.docCount = docCount; }; } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>( + size, + bigArrays(), + order.partiallyBuiltBucketComparator(GlobalOrdinalsStringTermsAggregator.this) + ); } @Override StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException { BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); - StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); - result.bucketOrd = temp.bucketOrd; - return result; + return new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); } @Override - void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override @@ -1001,8 +1032,7 @@ private long subsetSize(long owningBucketOrd) { @Override BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) { long subsetSize = subsetSize(owningBucketOrd); - return (spare, globalOrd, bucketOrd, docCount) -> { - spare.bucketOrd = bucketOrd; + return (spare, globalOrd, docCount) -> { oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); @@ -1016,7 +1046,7 @@ BucketUpdater bucketUpdater(long owningBucketOrd, } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @@ -1029,8 +1059,8 @@ SignificantStringTerms.Bucket convertTempBucketToRealBucket( } @Override - void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPreOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPreOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 78ae2481f5d99..5108793b8a809 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -10,12 +10,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.common.util.ObjectObjectPagedHashMap; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationErrors; import org.elasticsearch.search.aggregations.AggregationReduceContext; -import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorReducer; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; @@ -58,12 +58,6 @@ public interface Reader> { long subsetDf; long supersetDf; - /** - * Ordinal of the bucket while it is being built. Not used after it is - * returned from {@link Aggregator#buildAggregations(org.elasticsearch.common.util.LongArray)} and not - * serialized. - */ - transient long bucketOrd; double score; protected InternalAggregations aggregations; final transient DocValueFormat format; @@ -235,7 +229,12 @@ canLeadReduction here is essentially checking if this shard returned data. Unma public InternalAggregation get() { final SignificanceHeuristic heuristic = getSignificanceHeuristic().rewrite(reduceContext); final int size = (int) (reduceContext.isFinalReduce() == false ? buckets.size() : Math.min(requiredSize, buckets.size())); - try (BucketSignificancePriorityQueue ordered = new BucketSignificancePriorityQueue<>(size, reduceContext.bigArrays())) { + try (ObjectArrayPriorityQueue ordered = new ObjectArrayPriorityQueue(size, reduceContext.bigArrays()) { + @Override + protected boolean lessThan(B a, B b) { + return a.getSignificanceScore() < b.getSignificanceScore(); + } + }) { buckets.forEach(entry -> { final B b = createBucket( entry.value.subsetDf[0], diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java index 739f0b923eaab..de35046691b34 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalTerms.java @@ -38,8 +38,6 @@ public interface Reader> { B read(StreamInput in, DocValueFormat format, boolean showDocCountError) throws IOException; } - long bucketOrd; - protected long docCount; private long docCountError; protected InternalAggregations aggregations; @@ -88,14 +86,6 @@ public void setDocCount(long docCount) { this.docCount = docCount; } - public long getBucketOrd() { - return bucketOrd; - } - - public void setBucketOrd(long bucketOrd) { - this.bucketOrd = bucketOrd; - } - @Override public long getDocCountError() { return docCountError; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index b96c495d37489..026912a583ef3 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -17,6 +17,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; @@ -287,40 +289,55 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(Math.toIntExact(owningBucketOrds.size())) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - long owningOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningOrd, excludeDeletedDocs); - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd); - BucketUpdater bucketUpdater = bucketUpdater(owningOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = buildEmptyBucket(); + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); + final int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + long owningOrd = owningBucketOrds.get(ordIdx); + try (ObjectArrayPriorityQueue> ordered = buildPriorityQueue(bucketsToCollect.get(ordIdx))) { + BucketAndOrd spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd); + BucketUpdater bucketUpdater = bucketUpdater(owningOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(buildEmptyBucket()); + } + bucketUpdater.updateBucket(spare.bucket, ordsEnum, docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + + final int orderedSize = (int) ordered.size(); + final B[] buckets = buildBuckets(orderedSize); + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + finalizeBucket(bucketAndOrd.bucket); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; } - bucketUpdater.updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd.set(ordIdx, buildBuckets((int) ordered.size())); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); - finalizeBucket(topBucketsPerOrd.get(ordIdx)[i]); } + assert ordsCollected == ordsArray.size(); + buildSubAggs(topBucketsPerOrd, ordsArray); } } - - buildSubAggs(topBucketsPerOrd); - return MapStringTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) @@ -355,7 +372,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro * Build a {@link PriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Update fields in {@code spare} to reflect information collected for @@ -382,9 +399,9 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro /** * Build the sub-aggregations into the buckets. This will usually - * delegate to {@link #buildSubAggsForAllBuckets}. + * delegate to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}. */ - abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException; /** * Turn the buckets into an aggregation result. @@ -407,9 +424,11 @@ interface BucketUpdater */ class StandardTermsResults extends ResultStrategy { private final ValuesSource valuesSource; + private final Comparator> comparator; - StandardTermsResults(ValuesSource valuesSource) { + StandardTermsResults(ValuesSource valuesSource, Aggregator aggregator) { this.valuesSource = valuesSource; + this.comparator = order.partiallyBuiltBucketComparator(aggregator); } @Override @@ -498,8 +517,8 @@ StringTerms.Bucket buildEmptyBucket() { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, bigArrays(), comparator); } @Override @@ -507,7 +526,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (spare, ordsEnum, docCount) -> { ordsEnum.readValue(spare.termBytes); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -532,8 +550,8 @@ void finalizeBucket(StringTerms.Bucket bucket) { } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordArray, (b, a) -> b.aggregations = a); } @Override @@ -625,7 +643,7 @@ SignificantStringTerms.Bucket buildEmptyBucket() { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @@ -634,7 +652,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) long subsetSize = subsetSizes.get(owningBucketOrd); return (spare, ordsEnum, docCount) -> { ordsEnum.readValue(spare.termBytes); - spare.bucketOrd = ordsEnum.ord(); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); /* @@ -667,8 +684,8 @@ void finalizeBucket(SignificantStringTerms.Bucket bucket) { } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, a) -> b.aggregations = a); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 5d4c15d8a3b80..a54053f712f8d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.NumericUtils; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -40,6 +41,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; @@ -167,42 +169,56 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { - final long owningBucketOrd = owningBucketOrds.get(ordIdx); - collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - BucketUpdater bucketUpdater = bucketUpdater(owningBucketOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = buildEmptyBucket(); - } - bucketUpdater.updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrd), bucketCountThresholds.getShardSize()); + bucketsToCollect.set(ordIdx, size); + ordsToCollect += size; + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + try (ObjectArrayPriorityQueue> ordered = buildPriorityQueue(bucketsToCollect.get(ordIdx))) { + BucketAndOrd spare = null; + BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + BucketUpdater bucketUpdater = bucketUpdater(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>(buildEmptyBucket()); + } + bucketUpdater.updateBucket(spare.bucket, ordsEnum, docCount); + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } + + // Get the top buckets + final int orderedSize = (int) ordered.size(); + final B[] bucketsForOrd = buildBuckets(orderedSize); + for (int b = orderedSize - 1; b >= 0; --b) { + BucketAndOrd bucketAndOrd = ordered.pop(); + bucketsForOrd[b] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + b, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount()); + } + topBucketsPerOrd.set(ordIdx, bucketsForOrd); + ordsCollected += orderedSize; - // Get the top buckets - B[] bucketsForOrd = buildBuckets((int) ordered.size()); - topBucketsPerOrd.set(ordIdx, bucketsForOrd); - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd.get(ordIdx)[b] = ordered.pop(); - otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[b].getDocCount()); + } } + assert ordsCollected == ordsArray.size(); + buildSubAggs(topBucketsPerOrd, ordsArray); } } - - buildSubAggs(topBucketsPerOrd); - return NumericTermsAggregator.this.buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) @@ -254,13 +270,13 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro * Build a {@link ObjectArrayPriorityQueue} to sort the buckets. After we've * collected all of the buckets we'll collect all entries in the queue. */ - abstract ObjectArrayPriorityQueue buildPriorityQueue(int size); + abstract ObjectArrayPriorityQueue> buildPriorityQueue(int size); /** * Build the sub-aggregations into the buckets. This will usually - * delegate to {@link #buildSubAggsForAllBuckets}. + * delegate to {@link #buildSubAggsForAllBuckets(ObjectArray, LongArray, BiConsumer)}. */ - abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException; /** * Collect extra entries for "zero" hit documents if they were requested @@ -287,9 +303,11 @@ interface BucketUpdater abstract class StandardTermsResultStrategy, B extends InternalTerms.Bucket> extends ResultStrategy { protected final boolean showTermDocCountError; + private final Comparator> comparator; - StandardTermsResultStrategy(boolean showTermDocCountError) { + StandardTermsResultStrategy(boolean showTermDocCountError, Aggregator aggregator) { this.showTermDocCountError = showTermDocCountError; + this.comparator = order.partiallyBuiltBucketComparator(aggregator); } @Override @@ -298,13 +316,13 @@ final LeafBucketCollector wrapCollector(LeafBucketCollector primary) { } @Override - final ObjectArrayPriorityQueue buildPriorityQueue(int size) { - return new BucketPriorityQueue<>(size, bigArrays(), partiallyBuiltBucketComparator); + final ObjectArrayPriorityQueue> buildPriorityQueue(int size) { + return new BucketPriorityQueue<>(size, bigArrays(), comparator); } @Override - final void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + final void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override @@ -340,8 +358,8 @@ public final void close() {} } class LongTermsResults extends StandardTermsResultStrategy { - LongTermsResults(boolean showTermDocCountError) { - super(showTermDocCountError); + LongTermsResults(boolean showTermDocCountError, Aggregator aggregator) { + super(showTermDocCountError, aggregator); } @Override @@ -374,7 +392,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (LongTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) -> { spare.term = ordsEnum.value(); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -424,8 +441,8 @@ LongTerms buildEmptyResult() { class DoubleTermsResults extends StandardTermsResultStrategy { - DoubleTermsResults(boolean showTermDocCountError) { - super(showTermDocCountError); + DoubleTermsResults(boolean showTermDocCountError, Aggregator aggregator) { + super(showTermDocCountError, aggregator); } @Override @@ -458,7 +475,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { return (DoubleTerms.Bucket spare, BucketOrdsEnum ordsEnum, long docCount) -> { spare.term = NumericUtils.sortableLongToDouble(ordsEnum.value()); spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); }; } @@ -575,7 +591,6 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { spare.term = ordsEnum.value(); spare.subsetDf = docCount; spare.supersetDf = backgroundFrequencies.freq(spare.term); - spare.bucketOrd = ordsEnum.ord(); // During shard-local down-selection we use subset/superset stats that are for this shard only // Back at the central reducer these properties will be updated with global stats spare.updateScore(significanceHeuristic, subsetSize, supersetSize); @@ -583,13 +598,13 @@ BucketUpdater bucketUpdater(long owningBucketOrd) { } @Override - ObjectArrayPriorityQueue buildPriorityQueue(int size) { + ObjectArrayPriorityQueue> buildPriorityQueue(int size) { return new BucketSignificancePriorityQueue<>(size, bigArrays()); } @Override - void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + void buildSubAggs(ObjectArray topBucketsPerOrd, LongArray ordsArray) throws IOException { + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, aggs) -> b.aggregations = aggs); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java index 4922be7cec1ba..c07c0726a4ae1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregator.java @@ -27,7 +27,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.Comparator; import java.util.HashSet; import java.util.Map; import java.util.Objects; @@ -190,7 +189,6 @@ public boolean equals(Object obj) { protected final DocValueFormat format; protected final BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; - protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting; protected final SubAggCollectionMode collectMode; @@ -209,7 +207,9 @@ public TermsAggregator( super(name, factories, context, parent, metadata); this.bucketCountThresholds = bucketCountThresholds; this.order = order; - partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); + if (order != null) { + order.validate(this); + } this.format = format; if ((subAggsNeedScore() && descendsFromNestedAggregator(parent)) || context.isInSortOrderExecutionRequired()) { /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 2c7b768fcdbb3..da5ae37b08228 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -195,12 +195,12 @@ private static TermsAggregatorSupplier numericSupplier() { if (includeExclude != null) { longFilter = includeExclude.convertToDoubleFilter(); } - resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError); + resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError, agg); } else { if (includeExclude != null) { longFilter = includeExclude.convertToLongFilter(valuesSourceConfig.format()); } - resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError); + resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError, agg); } return new NumericTermsAggregator( name, @@ -403,7 +403,7 @@ Aggregator create( name, factories, new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSourceConfig), - a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource()), + a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource(), a), order, valuesSourceConfig.format(), bucketCountThresholds, diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java index 0d42a2856a10e..85510c8a989c0 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/InternalMultiTerms.java @@ -37,9 +37,6 @@ public class InternalMultiTerms extends AbstractInternalTerms { - - long bucketOrd; - protected long docCount; protected InternalAggregations aggregations; private long docCountError; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index 1691aedf543f4..5c10e2c8feeb1 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; @@ -40,6 +41,7 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.bucket.DeferableBucketAggregator; +import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd; import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue; import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds; import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; @@ -72,7 +74,7 @@ class MultiTermsAggregator extends DeferableBucketAggregator { protected final List formats; protected final TermsAggregator.BucketCountThresholds bucketCountThresholds; protected final BucketOrder order; - protected final Comparator partiallyBuiltBucketComparator; + protected final Comparator> partiallyBuiltBucketComparator; protected final Set aggsUsedForSorting; protected final SubAggCollectionMode collectMode; private final List values; @@ -99,7 +101,7 @@ protected MultiTermsAggregator( super(name, factories, context, parent, metadata); this.bucketCountThresholds = bucketCountThresholds; this.order = order; - partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(b -> b.bucketOrd, this); + partiallyBuiltBucketComparator = order == null ? null : order.partiallyBuiltBucketComparator(this); this.formats = formats; this.showTermDocCountError = showTermDocCountError; if (subAggsNeedScore() && descendsFromNestedAggregator(parent) || context.isInSortOrderExecutionRequired()) { @@ -242,52 +244,67 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) ) { - for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { - final long owningBucketOrd = owningBucketOrds.get(ordIdx); - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try ( - ObjectArrayPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - InternalMultiTerms.Bucket spare = null; - BytesRef spareKey = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts.increment(ordIdx, docCount); - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; - } - if (spare == null) { - checkRealMemoryCBForInternalBucket(); - spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters); - spareKey = new BytesRef(); - } - ordsEnum.readValue(spareKey); - spare.terms = unpackTerms(spareKey); - spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - } + try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) { + long ordsToCollect = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize()); + ordsToCollect += size; + bucketsToCollect.set(ordIdx, size); + } + try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) { + long ordsCollected = 0; + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); + + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + try ( + ObjectArrayPriorityQueue> ordered = new BucketPriorityQueue<>( + size, + bigArrays(), + partiallyBuiltBucketComparator + ) + ) { + BucketAndOrd spare = null; + BytesRef spareKey = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + checkRealMemoryCBForInternalBucket(); + spare = new BucketAndOrd<>( + new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters) + ); + spareKey = new BytesRef(); + } + ordsEnum.readValue(spareKey); + spare.bucket.terms = unpackTerms(spareKey); + spare.bucket.docCount = docCount; + spare.ord = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); + } - // Get the top buckets - InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()]; - topBucketsPerOrd.set(ordIdx, bucketsForOrd); - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - InternalMultiTerms.Bucket[] buckets = topBucketsPerOrd.get(ordIdx); - buckets[b] = ordered.pop(); - otherDocCounts.increment(ordIdx, -buckets[b].getDocCount()); + // Get the top buckets + int orderedSize = (int) ordered.size(); + InternalMultiTerms.Bucket[] buckets = new InternalMultiTerms.Bucket[orderedSize]; + for (int i = orderedSize - 1; i >= 0; --i) { + BucketAndOrd bucketAndOrd = ordered.pop(); + buckets[i] = bucketAndOrd.bucket; + ordsArray.set(ordsCollected + i, bucketAndOrd.ord); + otherDocCounts.increment(ordIdx, -buckets[i].getDocCount()); + } + topBucketsPerOrd.set(ordIdx, buckets); + ordsCollected += orderedSize; + } } + buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, (b, a) -> b.aggregations = a); } } - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - return buildAggregations( Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx))