Skip to content

Commit

Permalink
check parent circuit breaker when allocating empty bucket (elastic#89568
Browse files Browse the repository at this point in the history
)

closes elastic#80789
  • Loading branch information
boicehuang authored Aug 24, 2022
1 parent 773aeab commit 061e643
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/89568.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 89568
summary: check parent circuit breaker when allocating empty bucket
area: Aggregations
type: bug
issues: [80789]
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,19 @@ public void accept(long key) {

InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, key -> iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
iterateEmptyBuckets(list, iter, new LongConsumer() {
private int size = 0;

@Override
public void accept(long key) {
size++;
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
iter.add(new InternalDateHistogram.Bucket(key, 0, keyed, format, reducedEmptySubAggs));
}
});
}

private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, LongConsumer onBucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,19 @@ public void accept(double key) {
reduceContext
);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, key -> iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs)));
iterateEmptyBuckets(list, iter, new DoubleConsumer() {
private int size;

@Override
public void accept(double key) {
size++;
if (size >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(size);
size = 0;
}
iter.add(new Bucket(key, 0, keyed, format, reducedEmptySubAggs));
}
});
}

private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, DoubleConsumer onBucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,26 +214,25 @@ protected InternalDateHistogram mutateInstance(InternalDateHistogram instance) {
}

public void testLargeReduce() {
expectReduceUsesTooManyBuckets(
new InternalDateHistogram(
"h",
List.of(),
BucketOrder.key(true),
0,
0,
new InternalDateHistogram.EmptyBucketInfo(
Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(),
InternalAggregations.EMPTY,
new LongBounds(
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2018-01-01T00:00:00Z"),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2021-01-01T00:00:00Z")
)
),
DocValueFormat.RAW,
false,
null
InternalDateHistogram largeHisto = new InternalDateHistogram(
"h",
List.of(),
BucketOrder.key(true),
0,
0,
new InternalDateHistogram.EmptyBucketInfo(
Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(),
InternalAggregations.EMPTY,
new LongBounds(
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2020-01-01T00:00:00Z"),
DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2023-01-01T00:00:00Z")
)
),
100000
DocValueFormat.RAW,
false,
null
);
expectReduceUsesTooManyBuckets(largeHisto, 100000);
expectReduceThrowsRealMemoryBreaker(largeHisto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,18 @@ public void testHandlesNaN() {
}

public void testLargeReduce() {
expectReduceUsesTooManyBuckets(
new InternalHistogram(
"h",
List.of(),
BucketOrder.key(true),
0,
new InternalHistogram.EmptyBucketInfo(5e-10, 0, 0, 100, InternalAggregations.EMPTY),
DocValueFormat.RAW,
false,
null
),
100000
InternalHistogram largeHisto = new InternalHistogram(
"h",
List.of(),
BucketOrder.key(true),
0,
new InternalHistogram.EmptyBucketInfo(5e-8, 0, 0, 100, InternalAggregations.EMPTY),
DocValueFormat.RAW,
false,
null
);
expectReduceUsesTooManyBuckets(largeHisto, 100000);
expectReduceThrowsRealMemoryBreaker(largeHisto);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

package org.elasticsearch.test;

import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
Expand All @@ -33,6 +37,7 @@

import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.mock;

public abstract class InternalMultiBucketAggregationTestCase<T extends InternalAggregation & MultiBucketsAggregation> extends
Expand Down Expand Up @@ -248,4 +253,30 @@ public void accept(int value) {
Exception e = expectThrows(IllegalArgumentException.class, () -> agg.reduce(List.of(agg), reduceContext));
assertThat(e.getMessage(), equalTo("too big!"));
}

/**
* Expect that reducing this aggregation will break the real memory breaker.
*/
protected static void expectReduceThrowsRealMemoryBreaker(InternalAggregation agg) {
HierarchyCircuitBreakerService breaker = new HierarchyCircuitBreakerService(
Settings.builder().put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "50%").build(),
List.of(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
) {
@Override
public void checkParentLimit(long newBytesReserved, String label) throws CircuitBreakingException {
super.checkParentLimit(newBytesReserved, label);
}
};
AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal(
BigArrays.NON_RECYCLING_INSTANCE,
null,
() -> false,
mock(AggregationBuilder.class),
v -> breaker.getBreaker("request").addEstimateBytesAndMaybeBreak(0, "test"),
PipelineTree.EMPTY
);
Exception e = expectThrows(CircuitBreakingException.class, () -> agg.reduce(List.of(agg), reduceContext));
assertThat(e.getMessage(), startsWith("[parent] Data too large, data for [test] "));
}
}

0 comments on commit 061e643

Please sign in to comment.