From 79f7562647580e153785db02aab9eff5e8b30d0a Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 19 Nov 2024 12:53:13 +0100 Subject: [PATCH] Check the real memory circuit breaker when building internal aggregations --- .../adjacency/AdjacencyMatrixAggregator.java | 24 +++---- .../timeseries/TimeSeriesAggregator.java | 7 +- .../search/aggregations/AggregatorBase.java | 29 ++++++++ .../aggregations/NonCollectingAggregator.java | 6 +- .../bucket/BucketsAggregator.java | 69 ++++++++----------- .../countedterms/CountedTermsAggregator.java | 10 +-- .../bucket/geogrid/GeoGridAggregator.java | 10 +-- .../bucket/prefix/IpPrefixAggregator.java | 21 +++--- .../GlobalOrdinalsStringTermsAggregator.java | 20 +++--- .../bucket/terms/LongRareTermsAggregator.java | 18 ++--- .../terms/MapStringTermsAggregator.java | 11 +-- .../bucket/terms/NumericTermsAggregator.java | 10 +-- .../terms/StringRareTermsAggregator.java | 18 ++--- .../metrics/MetricsAggregator.java | 6 +- .../multiterms/MultiTermsAggregator.java | 10 +-- .../CategorizeTextAggregator.java | 13 ++-- 16 files changed, 137 insertions(+), 145 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java index 29e8aec00a02d..8a7349560228c 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java @@ -188,17 +188,16 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } } try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) { - int builtBucketIndex = 0; + int[] builtBucketIndex = new int[] { 0 }; for (int ord = 0; ord < maxOrd; ord++) { if (bucketDocCount(ord) > 0) { - bucketOrdsToBuild.set(builtBucketIndex++, ord); + bucketOrdsToBuild.set(builtBucketIndex[0]++, ord); } } - assert builtBucketIndex == totalBucketsToBuild; - builtBucketIndex = 0; + assert builtBucketIndex[0] == totalBucketsToBuild; + builtBucketIndex[0] = 0; var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) { + InternalAggregation[] aggregations = buildAggregations(Math.toIntExact(owningBucketOrds.size()), owningBucketOrdIdx -> { List buckets = new ArrayList<>(filters.length); for (int i = 0; i < keys.length; i++) { long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i); @@ -207,10 +206,11 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw // a date-histogram where we will look for transactions over time and can expect many // empty buckets. if (docCount > 0) { + checkRealMemoryForInternalBucket(); InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket( keys[i], docCount, - bucketSubAggs.apply(builtBucketIndex++) + bucketSubAggs.apply(builtBucketIndex[0]++) ); buckets.add(bucket); } @@ -226,17 +226,17 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket( intersectKey, docCount, - bucketSubAggs.apply(builtBucketIndex++) + bucketSubAggs.apply(builtBucketIndex[0]++) ); buckets.add(bucket); } pos++; } } - results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata()); - } - assert builtBucketIndex == totalBucketsToBuild; - return results; + return new InternalAdjacencyMatrix(name, buckets, metadata()); + }); + assert builtBucketIndex[0] == totalBucketsToBuild; + return aggregations; } } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index 1263d4282a18a..bf55c161b0de5 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -79,6 +79,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw while (ordsEnum.next()) { long docCount = bucketDocCount(ordsEnum.ord()); ordsEnum.readValue(spare); + checkRealMemoryForInternalBucket(); InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here. docCount, @@ -101,11 +102,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { - result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx)); - } - return result; + return buildAggregations(Math.toIntExact(allBucketsPerOrd.size()), ordIdx -> buildResult(allBucketsPerOrd.get(ordIdx))); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java index bf9116207b375..0a30aaabc213d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorBase.java @@ -13,6 +13,8 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.CheckedIntFunction; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.Maps; @@ -48,6 +50,8 @@ public abstract class AggregatorBase extends Aggregator { private Map subAggregatorbyName; private long requestBytesUsed; + private final CircuitBreaker breaker; + private int callCount; /** * Constructs a new Aggregator. @@ -72,6 +76,7 @@ protected AggregatorBase( this.metadata = metadata; this.parent = parent; this.context = context; + this.breaker = context.breaker(); assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead"; this.subAggregators = factories.createSubAggregators(this, subAggregatorCardinality); context.addReleasable(this); @@ -327,6 +332,30 @@ protected final InternalAggregations buildEmptySubAggregations() { return InternalAggregations.from(aggs); } + /** + * Builds the aggregations array with the provided size and populates it using the provided function. + */ + protected final InternalAggregation[] buildAggregations(int size, CheckedIntFunction aggFunction) + throws IOException { + final InternalAggregation[] results = new InternalAggregation[size]; + for (int i = 0; i < results.length; i++) { + updateCircuitBreaker("InternalAggregation"); + results[i] = aggFunction.apply(i); + } + return results; + } + + /** + * This method calls the circuit breaker from time to time in order to give it a chance to check available + * memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out. + * To achieve that, we are passing 0 as the estimated bytes every 1024 calls + */ + protected final void updateCircuitBreaker(String label) { + if ((++callCount & 0x3FF) == 0) { + breaker.addEstimateBytesAndMaybeBreak(0, label); + } + } + @Override public String toString() { return name; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java index 4da2d10cfc0c2..a32211fd4d8fb 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java @@ -41,10 +41,6 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag @Override public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = buildEmptyAggregation(); - } - return results; + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildEmptyAggregation()); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index 252eb0877d024..4f8919d0219a1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -9,7 +9,6 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; -import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; @@ -42,10 +41,9 @@ import java.util.function.ToLongFunction; public abstract class BucketsAggregator extends AggregatorBase { - private final CircuitBreaker breaker; + private LongArray docCounts; protected final DocCountProvider docCountProvider; - private int callCount; @SuppressWarnings("this-escape") public BucketsAggregator( @@ -57,7 +55,6 @@ public BucketsAggregator( Map metadata ) throws IOException { super(name, factories, aggCtx, parent, bucketCardinality, metadata); - breaker = aggCtx.breaker(); docCounts = bigArrays().newLongArray(1, true); docCountProvider = new DocCountProvider(); } @@ -176,7 +173,7 @@ protected final IntFunction buildSubAggsForBuckets(LongArr prepareSubAggs(bucketOrdsToCollect); InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][]; for (int i = 0; i < subAggregators.length; i++) { - updateCircuitBreaker("building_sub_aggregation"); + checkRealMemoryForInternalBucket(); aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect); } return subAggsForBucketFunction(aggregations); @@ -247,31 +244,30 @@ protected final InternalAggregation[] buildAggregationsForFixedBucketCount( Function, InternalAggregation> resultBuilder ) throws IOException { try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) { - int bucketOrdIdx = 0; + int[] bucketOrdIdx = new int[] { 0 }; for (long i = 0; i < owningBucketOrds.size(); i++) { long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd; for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { - bucketOrdsToCollect.set(bucketOrdIdx++, ord++); + bucketOrdsToCollect.set(bucketOrdIdx[0]++, ord++); } } - bucketOrdIdx = 0; + bucketOrdIdx[0] = 0; var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) { + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { List buckets = new ArrayList<>(bucketsPerOwningBucketOrd); for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { + checkRealMemoryForInternalBucket(); buckets.add( bucketBuilder.build( offsetInOwningOrd, - bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)), - subAggregationResults.apply(bucketOrdIdx++) + bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx[0])), + subAggregationResults.apply(bucketOrdIdx[0]++) ) ); } - results[owningOrdIdx] = resultBuilder.apply(buckets); - } - return results; + return resultBuilder.apply(buckets); + }); } } @@ -295,11 +291,10 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket( * here but we don't because single bucket aggs never have. */ var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx)); - } - return results; + return buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx)) + ); } @FunctionalInterface @@ -335,37 +330,36 @@ protected final InternalAggregation[] buildAggregationsForVariableBuckets( ); } try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) { - int b = 0; + int[] b = new int[] { 0 }; for (long i = 0; i < owningBucketOrds.size(); i++) { LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i)); while (ordsEnum.next()) { - bucketOrdsToCollect.set(b++, ordsEnum.ord()); + bucketOrdsToCollect.set(b[0]++, ordsEnum.ord()); } } var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - b = 0; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + b[0] = 0; + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { final long owningBucketOrd = owningBucketOrds.get(ordIdx); List buckets = new ArrayList<>(bucketsInOrd.get(ordIdx)); LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); while (ordsEnum.next()) { - if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) { + if (bucketOrdsToCollect.get(b[0]) != ordsEnum.ord()) { // If we hit this, something has gone horribly wrong and we need to investigate throw AggregationErrors.iterationOrderChangedWithoutMutating( bucketOrds.toString(), ordsEnum.ord(), - bucketOrdsToCollect.get(b) + bucketOrdsToCollect.get(b[0]) ); } + checkRealMemoryForInternalBucket(); buckets.add( - bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++)) + bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b[0]++)) ); } - results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets); - } - return results; + return resultBuilder.build(owningBucketOrd, buckets); + }); } } } @@ -425,14 +419,9 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException docCountProvider.setLeafReaderContext(ctx); } - /** - * This method calls the circuit breaker from time to time in order to give it a chance to check available - * memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out. - * To achieve that, we are passing 0 as the estimated bytes every 1024 calls - */ - private void updateCircuitBreaker(String label) { - if ((++callCount & 0x3FF) == 0) { - breaker.addEstimateBytesAndMaybeBreak(0, label); - } + /** This method should be call whenever a new bucket object is created. It will che k the real memory + * circuit breaker in a sampling fashion. See {@link #updateCircuitBreaker(String)} */ + protected void checkRealMemoryForInternalBucket() { + updateCircuitBreaker("internal_bucket"); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java index 05fce2cff64d5..20723c136308c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java @@ -140,6 +140,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw long docCount = bucketDocCount(ordsEnum.ord()); otherDocCounts.increment(ordIdx, docCount); if (spare == null) { + checkRealMemoryForInternalBucket(); spare = emptyBucketBuilder.get(); } ordsEnum.readValue(spare.getTermBytes()); @@ -158,8 +159,8 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { final BucketOrder reduceOrder; if (isKeyOrder(order) == false) { reduceOrder = InternalOrder.key(true); @@ -167,7 +168,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } else { reduceOrder = order; } - result[ordIdx] = new StringTerms( + return new StringTerms( name, reduceOrder, order, @@ -181,8 +182,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw Arrays.asList(topBucketsPerOrd.get(ordIdx)), null ); - } - return result; + }); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index 0e63e26e77a55..466097e398ede 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -144,6 +144,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); while (ordsEnum.next()) { if (spare == null) { + checkRealMemoryForInternalBucket(); spare = newEmptyBucket(); } @@ -162,11 +163,10 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } } buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata()); - } - return results; + return buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata()) + ); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java index e8ba0393208a0..3d39b12f23e53 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java @@ -172,32 +172,32 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw } try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) { - int b = 0; + int[] b = new int[] { 0 }; for (long i = 0; i < owningBucketOrds.size(); i++) { BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i)); while (ordsEnum.next()) { - bucketOrdsToCollect.set(b++, ordsEnum.ord()); + bucketOrdsToCollect.set(b[0]++, ordsEnum.ord()); } } var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - b = 0; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + b[0] = 0; + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { List buckets = new ArrayList<>(bucketsInOrd.get(ordIdx)); BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); while (ordsEnum.next()) { long ordinal = ordsEnum.ord(); - if (bucketOrdsToCollect.get(b) != ordinal) { + if (bucketOrdsToCollect.get(b[0]) != ordinal) { throw AggregationErrors.iterationOrderChangedWithoutMutating( bucketOrds.toString(), ordinal, - bucketOrdsToCollect.get(b) + bucketOrdsToCollect.get(b[0]) ); } BytesRef ipAddress = new BytesRef(); ordsEnum.readValue(ipAddress); long docCount = bucketDocCount(ordinal); + checkRealMemoryForInternalBucket(); buckets.add( new InternalIpPrefix.Bucket( config.format(), @@ -207,16 +207,15 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw ipPrefix.prefixLength, ipPrefix.appendPrefixLength, docCount, - subAggregationResults.apply(b++) + subAggregationResults.apply(b[0]++) ) ); // NOTE: the aggregator is expected to return sorted results CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); } - results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata()); - } - return results; + return new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata()); + }); } } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index d04d7528ea938..1baa3330bae96 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -700,11 +700,10 @@ abstract class ResultStrategy< private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { if (valueCount == 0) { // no context in this reader - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = buildNoValuesResult(owningBucketOrds.get(ordIdx)); - } - return results; + return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildNoValuesResult(owningBucketOrds.get(ordIdx)) + ); } try ( LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true); @@ -731,6 +730,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep otherDocCount.increment(finalOrdIdx, docCount); if (docCount >= bucketCountThresholds.getShardMinDocCount()) { if (spare == null) { + checkRealMemoryForInternalBucket(); spare = buildEmptyTemporaryBucket(); } updater.updateBucket(spare, globalOrd, bucketOrd, docCount); @@ -742,6 +742,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep // Get the top buckets topBucketsPreOrd.set(ordIdx, buildBuckets((int) ordered.size())); for (int i = (int) ordered.size() - 1; i >= 0; --i) { + checkRealMemoryForInternalBucket(); B bucket = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); topBucketsPreOrd.get(ordIdx)[i] = bucket; otherDocCount.increment(ordIdx, -bucket.getDocCount()); @@ -751,11 +752,10 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep buildSubAggs(topBucketsPreOrd); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPreOrd.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCount.get(ordIdx), topBucketsPreOrd.get(ordIdx)); - } - return results; + return GlobalOrdinalsStringTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCount.get(ordIdx), topBucketsPreOrd.get(ordIdx)) + ); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java index 877bd2cac4b05..68a7df096f8e6 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/LongRareTermsAggregator.java @@ -142,6 +142,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw long docCount = bucketDocCount(collectedBuckets.ord()); // if the key is below threshold, reinsert into the new ords if (docCount <= maxDocCount) { + checkRealMemoryForInternalBucket(); LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format); bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value()); mergeMap.set(collectedBuckets.ord(), bucket.bucketOrd); @@ -173,21 +174,12 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw * Now build the results! */ buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + + return LongRareTermsAggregator.this.buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { LongRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx); Arrays.sort(buckets, ORDER.comparator()); - result[ordIdx] = new LongRareTerms( - name, - ORDER, - metadata(), - format, - Arrays.asList(buckets), - maxDocCount, - filters.get(ordIdx) - ); - } - return result; + return new LongRareTerms(name, ORDER, metadata(), format, Arrays.asList(buckets), maxDocCount, filters.get(ordIdx)); + }); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index c02ed5509e6ae..206f2d0c711ea 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -304,6 +304,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro continue; } if (spare == null) { + checkRealMemoryForInternalBucket(); spare = emptyBucketBuilder.get(); } updateBucket(spare, ordsEnum, docCount); @@ -320,11 +321,11 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro } buildSubAggs(topBucketsPerOrd); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); - } - return result; + + return MapStringTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) + ); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index e10f0b8944027..0f922a7afc21c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -185,6 +185,7 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro continue; } if (spare == null) { + checkRealMemoryForInternalBucket(); spare = emptyBucketBuilder.get(); } updateBucket(spare, ordsEnum, docCount); @@ -203,11 +204,10 @@ private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) thro buildSubAggs(topBucketsPerOrd); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); - } - return result; + return NumericTermsAggregator.this.buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) + ); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 7200c33c71f70..d1e0f3358fa79 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -145,6 +145,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw long docCount = bucketDocCount(collectedBuckets.ord()); // if the key is below threshold, reinsert into the new ords if (docCount <= maxDocCount) { + checkRealMemoryForInternalBucket(); StringRareTerms.Bucket bucket = new StringRareTerms.Bucket( BytesRef.deepCopyOf(scratch), docCount, @@ -181,21 +182,12 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw * Now build the results! */ buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + + return StringRareTermsAggregator.this.buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> { StringRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx); Arrays.sort(buckets, ORDER.comparator()); - result[ordIdx] = new StringRareTerms( - name, - ORDER, - metadata(), - format, - Arrays.asList(buckets), - maxDocCount, - filters.get(ordIdx) - ); - } - return result; + return new StringRareTerms(name, ORDER, metadata(), format, Arrays.asList(buckets), maxDocCount, filters.get(ordIdx)); + }); } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java index 0d767e356108a..cf65f1ff7c835 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java @@ -38,10 +38,6 @@ protected MetricsAggregator(String name, AggregationContext context, Aggregator @Override public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = buildAggregation(owningBucketOrds.get(ordIdx)); - } - return results; + return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildAggregation(owningBucketOrds.get(ordIdx))); } } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index 0c6e94a15ec36..bf01f972dd33a 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -264,6 +264,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw continue; } if (spare == null) { + checkRealMemoryForInternalBucket(); spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters); spareKey = new BytesRef(); } @@ -287,11 +288,10 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; - for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { - result[ordIdx] = buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); - } - return result; + return buildAggregations( + Math.toIntExact(owningBucketOrds.size()), + ordIdx -> buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)) + ); } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java index 5b1ed7c954fe9..9b0ed6ea4c958 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java @@ -121,21 +121,22 @@ public InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws I continue; } int size = (int) Math.min(bucketOrds.bucketsInOrd(ordIdx), bucketCountThresholds.getShardSize()); + checkRealMemoryForInternalBucket(); topBucketsPerOrd.set(ordIdx, categorizer.toOrderedBuckets(size)); } buildSubAggsForAllBuckets(topBucketsPerOrd, Bucket::getBucketOrd, Bucket::setAggregations); - InternalAggregation[] results = new InternalAggregation[Math.toIntExact(ordsToCollect.size())]; - for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { - results[ordIdx] = new InternalCategorizationAggregation( + + return buildAggregations( + Math.toIntExact(ordsToCollect.size()), + ordIdx -> new InternalCategorizationAggregation( name, bucketCountThresholds.getRequiredSize(), bucketCountThresholds.getMinDocCount(), similarityThreshold, metadata(), Arrays.asList(topBucketsPerOrd.get(ordIdx)) - ); - } - return results; + ) + ); } }