Skip to content

Commit

Permalink
Check the real memory circuit breaker when building internal aggregat…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
iverase committed Nov 19, 2024
1 parent 9296fb4 commit 79f7562
Show file tree
Hide file tree
Showing 16 changed files with 137 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,16 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
}
}
try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) {
int builtBucketIndex = 0;
int[] builtBucketIndex = new int[] { 0 };
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
bucketOrdsToBuild.set(builtBucketIndex++, ord);
bucketOrdsToBuild.set(builtBucketIndex[0]++, ord);
}
}
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
assert builtBucketIndex[0] == totalBucketsToBuild;
builtBucketIndex[0] = 0;
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) {
InternalAggregation[] aggregations = buildAggregations(Math.toIntExact(owningBucketOrds.size()), owningBucketOrdIdx -> {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i);
Expand All @@ -207,10 +206,11 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
checkRealMemoryForInternalBucket();
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
keys[i],
docCount,
bucketSubAggs.apply(builtBucketIndex++)
bucketSubAggs.apply(builtBucketIndex[0]++)
);
buckets.add(bucket);
}
Expand All @@ -226,17 +226,17 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
intersectKey,
docCount,
bucketSubAggs.apply(builtBucketIndex++)
bucketSubAggs.apply(builtBucketIndex[0]++)
);
buckets.add(bucket);
}
pos++;
}
}
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
}
assert builtBucketIndex == totalBucketsToBuild;
return results;
return new InternalAdjacencyMatrix(name, buckets, metadata());
});
assert builtBucketIndex[0] == totalBucketsToBuild;
return aggregations;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
ordsEnum.readValue(spare);
checkRealMemoryForInternalBucket();
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
docCount,
Expand All @@ -101,11 +102,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
}
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);

InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx));
}
return result;
return buildAggregations(Math.toIntExact(allBucketsPerOrd.size()), ordIdx -> buildResult(allBucketsPerOrd.get(ordIdx)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.CheckedIntFunction;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
Expand Down Expand Up @@ -48,6 +50,8 @@ public abstract class AggregatorBase extends Aggregator {

private Map<String, Aggregator> subAggregatorbyName;
private long requestBytesUsed;
private final CircuitBreaker breaker;
private int callCount;

/**
* Constructs a new Aggregator.
Expand All @@ -72,6 +76,7 @@ protected AggregatorBase(
this.metadata = metadata;
this.parent = parent;
this.context = context;
this.breaker = context.breaker();
assert factories != null : "sub-factories provided to BucketAggregator must not be null, use AggragatorFactories.EMPTY instead";
this.subAggregators = factories.createSubAggregators(this, subAggregatorCardinality);
context.addReleasable(this);
Expand Down Expand Up @@ -327,6 +332,30 @@ protected final InternalAggregations buildEmptySubAggregations() {
return InternalAggregations.from(aggs);
}

/**
* Builds the aggregations array with the provided size and populates it using the provided function.
*/
protected final InternalAggregation[] buildAggregations(int size, CheckedIntFunction<InternalAggregation, IOException> aggFunction)
throws IOException {
final InternalAggregation[] results = new InternalAggregation[size];
for (int i = 0; i < results.length; i++) {
updateCircuitBreaker("InternalAggregation");
results[i] = aggFunction.apply(i);
}
return results;
}

/**
* This method calls the circuit breaker from time to time in order to give it a chance to check available
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
*/
protected final void updateCircuitBreaker(String label) {
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, label);
}
}

@Override
public String toString() {
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag

@Override
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> buildEmptyAggregation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.search.aggregations.bucket;

import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
Expand Down Expand Up @@ -42,10 +41,9 @@
import java.util.function.ToLongFunction;

public abstract class BucketsAggregator extends AggregatorBase {
private final CircuitBreaker breaker;

private LongArray docCounts;
protected final DocCountProvider docCountProvider;
private int callCount;

@SuppressWarnings("this-escape")
public BucketsAggregator(
Expand All @@ -57,7 +55,6 @@ public BucketsAggregator(
Map<String, Object> metadata
) throws IOException {
super(name, factories, aggCtx, parent, bucketCardinality, metadata);
breaker = aggCtx.breaker();
docCounts = bigArrays().newLongArray(1, true);
docCountProvider = new DocCountProvider();
}
Expand Down Expand Up @@ -176,7 +173,7 @@ protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(LongArr
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
updateCircuitBreaker("building_sub_aggregation");
checkRealMemoryForInternalBucket();
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
return subAggsForBucketFunction(aggregations);
Expand Down Expand Up @@ -247,31 +244,30 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
Function<List<B>, InternalAggregation> resultBuilder
) throws IOException {
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) {
int bucketOrdIdx = 0;
int[] bucketOrdIdx = new int[] { 0 };
for (long i = 0; i < owningBucketOrds.size(); i++) {
long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd;
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
bucketOrdsToCollect.set(bucketOrdIdx++, ord++);
bucketOrdsToCollect.set(bucketOrdIdx[0]++, ord++);
}
}
bucketOrdIdx = 0;
bucketOrdIdx[0] = 0;
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) {
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
checkRealMemoryForInternalBucket();
buckets.add(
bucketBuilder.build(
offsetInOwningOrd,
bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)),
subAggregationResults.apply(bucketOrdIdx++)
bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx[0])),
subAggregationResults.apply(bucketOrdIdx[0]++)
)
);
}
results[owningOrdIdx] = resultBuilder.apply(buckets);
}
return results;
return resultBuilder.apply(buckets);
});
}
}

Expand All @@ -295,11 +291,10 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket(
* here but we don't because single bucket aggs never have.
*/
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx));
}
return results;
return buildAggregations(
Math.toIntExact(owningBucketOrds.size()),
ordIdx -> resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx))
);
}

@FunctionalInterface
Expand Down Expand Up @@ -335,37 +330,36 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
);
}
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
int b = 0;
int[] b = new int[] { 0 };
for (long i = 0; i < owningBucketOrds.size(); i++) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
while (ordsEnum.next()) {
bucketOrdsToCollect.set(b++, ordsEnum.ord());
bucketOrdsToCollect.set(b[0]++, ordsEnum.ord());
}
}
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
b = 0;
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
b[0] = 0;
return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
List<B> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) {
if (bucketOrdsToCollect.get(b[0]) != ordsEnum.ord()) {
// If we hit this, something has gone horribly wrong and we need to investigate
throw AggregationErrors.iterationOrderChangedWithoutMutating(
bucketOrds.toString(),
ordsEnum.ord(),
bucketOrdsToCollect.get(b)
bucketOrdsToCollect.get(b[0])
);
}
checkRealMemoryForInternalBucket();
buckets.add(
bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))
bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b[0]++))
);
}
results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets);
}
return results;
return resultBuilder.build(owningBucketOrd, buckets);
});
}
}
}
Expand Down Expand Up @@ -425,14 +419,9 @@ protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException
docCountProvider.setLeafReaderContext(ctx);
}

/**
* This method calls the circuit breaker from time to time in order to give it a chance to check available
* memory in the parent breaker (Which should be a real memory breaker) and break the execution if we are running out.
* To achieve that, we are passing 0 as the estimated bytes every 1024 calls
*/
private void updateCircuitBreaker(String label) {
if ((++callCount & 0x3FF) == 0) {
breaker.addEstimateBytesAndMaybeBreak(0, label);
}
/** This method should be call whenever a new bucket object is created. It will che k the real memory
* circuit breaker in a sampling fashion. See {@link #updateCircuitBreaker(String)} */
protected void checkRealMemoryForInternalBucket() {
updateCircuitBreaker("internal_bucket");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryForInternalBucket();
spare = emptyBucketBuilder.get();
}
ordsEnum.readValue(spare.getTermBytes());
Expand All @@ -158,16 +159,16 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
}

buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {

return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBucketsPerOrd.get(ordIdx), reduceOrder.comparator());
} else {
reduceOrder = order;
}
result[ordIdx] = new StringTerms(
return new StringTerms(
name,
reduceOrder,
order,
Expand All @@ -181,8 +182,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
Arrays.asList(topBucketsPerOrd.get(ordIdx)),
null
);
}
return result;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
if (spare == null) {
checkRealMemoryForInternalBucket();
spare = newEmptyBucket();
}

Expand All @@ -162,11 +163,10 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata());
}
return results;
return buildAggregations(
Math.toIntExact(owningBucketOrds.size()),
ordIdx -> buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata())
);
}
}

Expand Down
Loading

0 comments on commit 79f7562

Please sign in to comment.