diff --git a/docs/changelog/89568.yaml b/docs/changelog/89568.yaml new file mode 100644 index 0000000000000..4957307449ae2 --- /dev/null +++ b/docs/changelog/89568.yaml @@ -0,0 +1,5 @@ +pr: 89568 +summary: check parent circuit breaker when allocating empty bucket +area: Aggregations +type: bug +issues: [80789] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index d443a20c2b87f..6478eb1c0fb0b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -408,7 +408,19 @@ public void accept(long key) { InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext); ListIterator iter = list.listIterator(); - iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs))); + iterateEmptyBuckets(list, iter, new LongConsumer() { + private int size = 0; + + @Override + public void accept(long key) { + size++; + if (size >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(size); + size = 0; + } + iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + } + }); } private void iterateEmptyBuckets(List list, ListIterator iter, LongConsumer onBucket) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index 08afa946508b8..81ffc9d2baaaa 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -400,7 +400,19 @@ public void accept(double key) { reduceContext ); ListIterator iter = list.listIterator(); - iterateEmptyBuckets(list, iter, key -> iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs))); + iterateEmptyBuckets(list, iter, new DoubleConsumer() { + private int size; + + @Override + public void accept(double key) { + size++; + if (size >= REPORT_EMPTY_EVERY) { + reduceContext.consumeBucketsAndMaybeBreak(size); + size = 0; + } + iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)); + } + }); } private void iterateEmptyBuckets(List list, ListIterator iter, DoubleConsumer onBucket) { diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java index e6d40e0bc353d..093ccc7181767 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogramTests.java @@ -214,26 +214,25 @@ protected InternalDateHistogram mutateInstance(InternalDateHistogram instance) { } public void testLargeReduce() { - expectReduceUsesTooManyBuckets( - new InternalDateHistogram( - "h", - List.of(), - BucketOrder.key(true), - 0, - 0, - new InternalDateHistogram.EmptyBucketInfo( - Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), - InternalAggregations.EMPTY, - new LongBounds( - DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"), - DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z") - ) - ), - DocValueFormat.RAW, - false, - null + InternalDateHistogram largeHisto = new InternalDateHistogram( + "h", + List.of(), + BucketOrder.key(true), + 0, + 0, + new InternalDateHistogram.EmptyBucketInfo( + Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), + InternalAggregations.EMPTY, + new LongBounds( + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-01T00:00:00Z"), + DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z") + ) ), - 100000 + DocValueFormat.RAW, + false, + null ); + expectReduceUsesTooManyBuckets(largeHisto, 100000); + expectReduceThrowsRealMemoryBreaker(largeHisto); } } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java index 35c4256deec2a..81e1c7014d9ba 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogramTests.java @@ -103,19 +103,18 @@ public void testHandlesNaN() { } public void testLargeReduce() { - expectReduceUsesTooManyBuckets( - new InternalHistogram( - "h", - List.of(), - BucketOrder.key(true), - 0, - new InternalHistogram.EmptyBucketInfo(5e-10, 0, 0, 100, InternalAggregations.EMPTY), - DocValueFormat.RAW, - false, - null - ), - 100000 + InternalHistogram largeHisto = new InternalHistogram( + "h", + List.of(), + BucketOrder.key(true), + 0, + new InternalHistogram.EmptyBucketInfo(5e-8, 0, 0, 100, InternalAggregations.EMPTY), + DocValueFormat.RAW, + false, + null ); + expectReduceUsesTooManyBuckets(largeHisto, 100000); + expectReduceThrowsRealMemoryBreaker(largeHisto); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java index 2b934318b2176..55ddfe17459e1 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalMultiBucketAggregationTestCase.java @@ -8,7 +8,11 @@ package org.elasticsearch.test; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationReduceContext; @@ -33,6 +37,7 @@ import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Mockito.mock; public abstract class InternalMultiBucketAggregationTestCase extends @@ -248,4 +253,30 @@ public void accept(int value) { Exception e = expectThrows(IllegalArgumentException.class, () -> agg.reduce(List.of(agg), reduceContext)); assertThat(e.getMessage(), equalTo("too big!")); } + + /** + * Expect that reducing this aggregation will break the real memory breaker. + */ + protected static void expectReduceThrowsRealMemoryBreaker(InternalAggregation agg) { + HierarchyCircuitBreakerService breaker = new HierarchyCircuitBreakerService( + Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%").build(), + List.of(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ) { + @Override + public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException { + super.checkParentLimit(newBytesReserved, label); + } + }; + AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal( + BigArrays.NON_RECYCLING_INSTANCE, + null, + () -> false, + mock(AggregationBuilder.class), + v -> breaker.getBreaker("request").addEstimateBytesAndMaybeBreak(0, "test"), + PipelineTree.EMPTY + ); + Exception e = expectThrows(CircuitBreakingException.class, () -> agg.reduce(List.of(agg), reduceContext)); + assertThat(e.getMessage(), startsWith("[parent] Data too large, data for [test] ")); + } }