Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove bucketOrd field from InternalTerms and friends #118044

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.xcontent.ToXContentObject;

Expand All @@ -20,13 +21,12 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* {@link Bucket} ordering strategy. Buckets can be order either as
* "complete" buckets using {@link #comparator()} or against a combination
* of the buckets internals with its ordinal with
* {@link #partiallyBuiltBucketComparator(ToLongFunction, Aggregator)}.
* {@link #partiallyBuiltBucketComparator(Aggregator)}.
*/
public abstract class BucketOrder implements ToXContentObject, Writeable {
/**
Expand Down Expand Up @@ -102,7 +102,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* to validate this order because doing so checks all of the appropriate
* paths.
*/
partiallyBuiltBucketComparator(null, aggregator);
partiallyBuiltBucketComparator(aggregator);
}

/**
Expand All @@ -121,7 +121,7 @@ public final void validate(Aggregator aggregator) throws AggregationExecutionExc
* with it all the time.
* </p>
*/
public abstract <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator);
public abstract <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator);

/**
* Build a comparator for fully built buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.search.aggregations.Aggregator.BucketComparator;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.sort.SortValue;
Expand All @@ -30,7 +31,6 @@
import java.util.List;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.ToLongFunction;

/**
* Implementations for {@link Bucket} ordering strategies.
Expand Down Expand Up @@ -63,10 +63,10 @@ public AggregationPath path() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
try {
BucketComparator bucketComparator = path.bucketComparator(aggregator, order);
return (lhs, rhs) -> bucketComparator.compare(ordinalReader.applyAsLong(lhs), ordinalReader.applyAsLong(rhs));
return (lhs, rhs) -> bucketComparator.compare(lhs.ord, rhs.ord);
} catch (IllegalArgumentException e) {
throw new AggregationExecutionException.InvalidPath("Invalid aggregation order path [" + path + "]. " + e.getMessage(), e);
}
Expand Down Expand Up @@ -188,12 +188,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
List<Comparator<T>> comparators = orderElements.stream()
.map(oe -> oe.partiallyBuiltBucketComparator(ordinalReader, aggregator))
.toList();
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
List<Comparator<BucketAndOrd<T>>> comparators = new ArrayList<>(orderElements.size());
for (BucketOrder order : orderElements) {
comparators.add(order.partiallyBuiltBucketComparator(aggregator));
}
return (lhs, rhs) -> {
for (Comparator<T> c : comparators) {
for (Comparator<BucketAndOrd<T>> c : comparators) {
int result = c.compare(lhs, rhs);
if (result != 0) {
return result;
Expand Down Expand Up @@ -299,9 +300,9 @@ byte id() {
}

@Override
public <T extends Bucket> Comparator<T> partiallyBuiltBucketComparator(ToLongFunction<T> ordinalReader, Aggregator aggregator) {
public <T extends Bucket> Comparator<BucketAndOrd<T>> partiallyBuiltBucketComparator(Aggregator aggregator) {
Comparator<Bucket> comparator = comparator();
return comparator::compare;
return (lhs, rhs) -> comparator.compare(lhs.bucket, rhs.bucket);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
Expand All @@ -26,6 +27,7 @@
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.bucket.terms.BucketAndOrd;
import org.elasticsearch.search.aggregations.bucket.terms.BucketPriorityQueue;
import org.elasticsearch.search.aggregations.bucket.terms.BytesKeyedBucketOrds;
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
Expand All @@ -38,7 +40,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static org.elasticsearch.search.aggregations.InternalOrder.isKeyOrder;
Expand Down Expand Up @@ -115,51 +116,57 @@ public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throw
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size());
ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());

// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
StringTerms.Bucket spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(
new BytesRef(),
0,
null,
false,
0,
format
);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = emptyBucketBuilder.get();
try (IntArray bucketsToCollect = bigArrays().newIntArray(owningBucketOrds.size())) {
// find how many buckets we are going to collect
long ordsToCollect = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), bucketCountThresholds.getShardSize());
bucketsToCollect.set(ordIdx, size);
ordsToCollect += size;
}
try (LongArray ordsArray = bigArrays().newLongArray(ordsToCollect)) {
long ordsCollected = 0;
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
bucketsToCollect.get(ordIdx),
bigArrays(),
order.partiallyBuiltBucketComparator(this)
)
) {
BucketAndOrd<StringTerms.Bucket> spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
checkRealMemoryCBForInternalBucket();
spare = new BucketAndOrd<>(new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format));
}
ordsEnum.readValue(spare.bucket.getTermBytes());
spare.bucket.setDocCount(docCount);
spare.ord = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
final int orderedSize = (int) ordered.size();
final StringTerms.Bucket[] buckets = new StringTerms.Bucket[orderedSize];
for (int i = orderedSize - 1; i >= 0; --i) {
BucketAndOrd<StringTerms.Bucket> bucketAndOrd = ordered.pop();
buckets[i] = bucketAndOrd.bucket;
ordsArray.set(ordsCollected + i, bucketAndOrd.ord);
otherDocCounts.increment(ordIdx, -bucketAndOrd.bucket.getDocCount());
bucketAndOrd.bucket.setTermBytes(BytesRef.deepCopyOf(bucketAndOrd.bucket.getTermBytes()));
}
topBucketsPerOrd.set(ordIdx, buckets);
ordsCollected += orderedSize;
}
ordsEnum.readValue(spare.getTermBytes());
spare.setDocCount(docCount);
spare.setBucketOrd(ordsEnum.ord());
spare = ordered.insertWithOverflow(spare);
}

topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());
topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes()));
}
assert ordsCollected == ordsArray.size();
buildSubAggsForAllBuckets(topBucketsPerOrd, ordsArray, InternalTerms.Bucket::setAggregations);
}
}

buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);

return buildAggregations(Math.toIntExact(owningBucketOrds.size()), ordIdx -> {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@

import java.util.Comparator;

public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<B> {
public class BucketPriorityQueue<B> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

private final Comparator<? super B> comparator;
private final Comparator<BucketAndOrd<B>> comparator;

public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<? super B> comparator) {
public BucketPriorityQueue(int size, BigArrays bigArrays, Comparator<BucketAndOrd<B>> comparator) {
super(size, bigArrays);
this.comparator = comparator;
}

@Override
protected boolean lessThan(B a, B b) {
protected boolean lessThan(BucketAndOrd<B> a, BucketAndOrd<B> b) {
return comparator.compare(a, b) > 0; // reverse, since we reverse again when adding to a list
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;

public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<B> {
public class BucketSignificancePriorityQueue<B extends SignificantTerms.Bucket> extends ObjectArrayPriorityQueue<BucketAndOrd<B>> {

public BucketSignificancePriorityQueue(int size, BigArrays bigArrays) {
super(size, bigArrays);
}

@Override
protected boolean lessThan(SignificantTerms.Bucket o1, SignificantTerms.Bucket o2) {
return o1.getSignificanceScore() < o2.getSignificanceScore();
protected boolean lessThan(BucketAndOrd<B> o1, BucketAndOrd<B> o2) {
return o1.bucket.getSignificanceScore() < o2.bucket.getSignificanceScore();
}
}
Loading