Skip to content

Commit

Permalink
Build sub aggregation buckets more lazily (#104762)
Browse files Browse the repository at this point in the history
Build these more lazily avoiding putting them in an array and don't keep
an accidental reference to the aggregator itself.
  • Loading branch information
original-brownbear authored Jan 25, 2024
1 parent 048fa93 commit fc2bdc2
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
Expand Down Expand Up @@ -195,7 +194,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
}
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
InternalAggregations[] bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < owningBucketOrds.length; owningBucketOrdIdx++) {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
Expand All @@ -209,7 +208,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
keys[i],
docCount,
bucketSubAggs[builtBucketIndex++]
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
Expand All @@ -225,7 +224,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
intersectKey,
docCount,
bucketSubAggs[builtBucketIndex++]
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.LongUnaryOperator;
import java.util.function.ToLongFunction;

Expand Down Expand Up @@ -172,28 +173,27 @@ protected void prepareSubAggs(long[] ordsToCollect) throws IOException {}
* @return the sub-aggregation results in the same order as the provided
* array of ordinals
*/
protected final InternalAggregations[] buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
aggregations[i] = subAggregators[i].buildAggregations(bucketOrdsToCollect);
}
InternalAggregations[] result = new InternalAggregations[bucketOrdsToCollect.length];
for (int ord = 0; ord < bucketOrdsToCollect.length; ord++) {
final int thisOrd = ord;
result[ord] = InternalAggregations.from(new AbstractList<>() {
@Override
public InternalAggregation get(int index) {
return aggregations[index][thisOrd];
}
return subAggsForBucketFunction(aggregations);
}

@Override
public int size() {
return aggregations.length;
}
});
}
return result;
private static IntFunction<InternalAggregations> subAggsForBucketFunction(InternalAggregation[][] aggregations) {
return ord -> InternalAggregations.from(new AbstractList<>() {
@Override
public InternalAggregation get(int index) {
return aggregations[index][ord];
}

@Override
public int size() {
return aggregations.length;
}
});
}

/**
Expand Down Expand Up @@ -221,11 +221,11 @@ protected final <B> void buildSubAggsForAllBuckets(
bucketOrdsToCollect[s++] = bucketToOrd.applyAsLong(bucket);
}
}
InternalAggregations[] results = buildSubAggsForBuckets(bucketOrdsToCollect);
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
s = 0;
for (B[] bucket : buckets) {
for (int b = 0; b < bucket.length; b++) {
setAggs.accept(bucket[b], results[s++]);
setAggs.accept(bucket[b], results.apply(s++));
}
}
}
Expand Down Expand Up @@ -254,7 +254,7 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
}
}
bucketOrdIdx = 0;
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
Expand All @@ -263,7 +263,7 @@ protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
bucketBuilder.build(
offsetInOwningOrd,
bucketDocCount(bucketOrdsToCollect[bucketOrdIdx]),
subAggregationResults[bucketOrdIdx++]
subAggregationResults.apply(bucketOrdIdx++)
)
);
}
Expand All @@ -289,10 +289,10 @@ protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] ow
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
* here but we don't because single bucket aggs never have.
*/
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults[ordIdx]);
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults.apply(ordIdx));
}
return results;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
bucketOrdsToCollect[b++] = ordsEnum.ord();
}
}
InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
b = 0;
Expand All @@ -352,7 +352,7 @@ protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
bucketOrdsToCollect[b]
);
}
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults[b++]));
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++)));
}
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,11 +197,11 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
}
InternalAggregations[] subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
var subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = subAggsForBuckets[slot];
InternalAggregations aggs = subAggsForBuckets.apply(slot);
long docCount = queue.getDocCount(slot);
buckets[queue.size()] = new InternalComposite.InternalBucket(
sourceNames,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,11 +555,11 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
bucketOrdsToCollect[i] = i;
}

InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);

List<InternalVariableWidthHistogram.Bucket> buckets = new ArrayList<>(numClusters);
for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) {
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults[bucketOrd]));
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults.apply(bucketOrd)));
}

Function<List<InternalVariableWidthHistogram.Bucket>, InternalAggregation> resultBuilder = bucketsToFormat -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.NonCollectingAggregator;
Expand Down Expand Up @@ -170,7 +169,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
}
}

InternalAggregations[] subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -193,7 +192,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
ipPrefix.prefixLength,
ipPrefix.appendPrefixLength,
docCount,
subAggregationResults[b++]
subAggregationResults.apply(b++)
)
);

Expand Down

0 comments on commit fc2bdc2

Please sign in to comment.