Skip to content

Commit

Permalink
Revert "return ObjectArray<InternalAggregation>"
Browse files Browse the repository at this point in the history
This reverts commit 213655c.
  • Loading branch information
iverase committed Nov 16, 2024
1 parent 0b8f041 commit c3bb082
Show file tree
Hide file tree
Showing 40 changed files with 326 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
Expand Down Expand Up @@ -179,7 +178,7 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
long maxOrd = owningBucketOrds.size() * totalNumKeys;
long totalBucketsToBuild = 0;
Expand All @@ -189,55 +188,55 @@ public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucket
}
}
try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) {
int[] builtBucketIndex = new int[] { 0 };
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
bucketOrdsToBuild.set(builtBucketIndex[0]++, ord);
bucketOrdsToBuild.set(builtBucketIndex++, ord);
}
}
assert builtBucketIndex[0] == totalBucketsToBuild;
builtBucketIndex[0] = 0;
try (var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild)) {
ObjectArray<InternalAggregation> result = buildAggregations(owningBucketOrds.size(), ordIdx -> {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(ordIdx), i);
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
keys[i],
docCount,
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), pos);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
// Empty buckets are not returned due to potential for very sparse matrices
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
keys[i],
intersectKey,
docCount,
bucketSubAggs.apply(builtBucketIndex[0]++)
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
pos++;
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(ordIdx), pos);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned due to potential for very sparse matrices
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
intersectKey,
docCount,
bucketSubAggs.apply(builtBucketIndex[0]++)
);
buckets.add(bucket);
}
pos++;
}
}
return new InternalAdjacencyMatrix(name, buckets, metadata());
});
assert builtBucketIndex[0] == totalBucketsToBuild;
return result;
}
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
}
assert builtBucketIndex == totalBucketsToBuild;
return results;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -139,7 +138,7 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag
return singleton != null ? getLeafCollector(singleton, sub) : getLeafCollector(values, sub);
}

protected final ObjectArray<InternalAggregation> buildAggregations(
protected final InternalAggregation[] buildAggregations(
LongKeyedBucketOrds bucketOrds,
LongToIntFunction roundingIndexFor,
LongArray owningBucketOrds
Expand Down Expand Up @@ -325,7 +324,7 @@ private void increaseRoundingIfNeeded(long rounded) {
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds);
}

Expand Down Expand Up @@ -595,7 +594,7 @@ private void rebucket() {
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
/*
* Rebucket before building the aggregation to build as small as result
* as possible.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public TimeSeriesAggregator(
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
BytesRef spare = new BytesRef();
try (ObjectArray<InternalTimeSeries.InternalBucket[]> allBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < allBucketsPerOrd.size(); ordIdx++) {
Expand Down Expand Up @@ -100,7 +100,12 @@ public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucket
allBucketsPerOrd.set(ordIdx, buckets.toArray(new InternalTimeSeries.InternalBucket[0]));
}
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
return buildAggregations(owningBucketOrds.size(), ordIdx -> buildResult(allBucketsPerOrd.get(ordIdx)));

InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx));
}
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.search.Query;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
Expand Down Expand Up @@ -46,7 +45,7 @@ public ChildrenToParentAggregator(
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalParent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.search.Query;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
Expand Down Expand Up @@ -42,7 +41,7 @@ public ParentToChildrenAggregator(
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalChildren(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;

import java.io.IOException;
Expand Down Expand Up @@ -101,21 +99,13 @@ public final void postCollection() throws IOException {
}

@Override
public final ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
final ObjectArray<InternalAggregation> delegateResults = delegate.buildAggregations(owningBucketOrds);
boolean success = false;
try {
// update in place
for (long ordIdx = 0; ordIdx < delegateResults.size(); ordIdx++) {
delegateResults.set(ordIdx, adapt(delegateResults.get(ordIdx)));
}
success = true;
return delegateResults;
} finally {
if (success == false) {
Releasables.close(delegateResults);
}
public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = adapt(delegateResults[ordIdx]);
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -143,9 +142,9 @@ public interface BucketComparator {
* @param ordsToCollect the ordinals of the buckets that we want to
* collect from this aggregation
* @return the results for each ordinal, in the same order as the array
* of ordinals. It is responsibility of the caller to close the returned {@link ObjectArray}
* of ordinals
*/
public abstract ObjectArray<InternalAggregation> buildAggregations(LongArray ordsToCollect) throws IOException;
public abstract InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException;

/**
* Release this aggregation and its sub-aggregations.
Expand All @@ -160,9 +159,7 @@ public interface BucketComparator {
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
try (var agg = buildAggregations(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true))) {
return agg.get(0);
}
return buildAggregations(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true))[0];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregator;
import org.elasticsearch.search.aggregations.bucket.sampler.random.RandomSamplerAggregator;
import org.elasticsearch.search.aggregations.metrics.MinAggregator;
Expand Down Expand Up @@ -363,32 +361,4 @@ protected final Query topLevelQuery() {
protected final IndexSearcher searcher() {
return context.searcher();
}

/**
* Build the {@link ObjectArray} of this aggregation.
* @param size the size of the array to allocate
* @param buildAggregation a function that builds the aggregation for a given ordinal
* @return the {@link ObjectArray} of this aggregation. It is responsibility of the caller to close the returned {@link ObjectArray}
*/
protected final ObjectArray<InternalAggregation> buildAggregations(long size, CheckedLongFunction<InternalAggregation> buildAggregation)
throws IOException {
final ObjectArray<InternalAggregation> results = bigArrays().newObjectArray(size);
boolean success = false;
try {
for (long ordIdx = 0; ordIdx < results.size(); ordIdx++) {
results.set(ordIdx, buildAggregation.apply(ordIdx));
}
success = true;
return results;
} finally {
if (success == false) {
Releasables.close(results);
}
}
}

@FunctionalInterface
protected interface CheckedLongFunction<T> {
T apply(long l) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.support.AggregationContext;

import java.io.IOException;
Expand Down Expand Up @@ -41,7 +40,11 @@ public final LeafBucketCollector getLeafCollector(AggregationExecutionContext ag
}

@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregations(owningBucketOrds.size(), l -> buildEmptyAggregation());
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
Expand Down Expand Up @@ -237,7 +236,7 @@ private static void failInCaseOfBadScorer(String message) {
public Aggregator wrap(final Aggregator in) {
return new WrappedAggregator(in) {
@Override
public ObjectArray<InternalAggregation> buildAggregations(LongArray owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
Expand Down
Loading

0 comments on commit c3bb082

Please sign in to comment.