Skip to content

Commit

Permalink
Reduce InternalHistogram and InternalDateHistogram in a streaming fas…
Browse files Browse the repository at this point in the history
…hion (#105359)
  • Loading branch information
iverase authored Feb 12, 2024
1 parent fa0ef3e commit f942605
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class InternalMultiBucketAggregation<
* buckets is plenty fast to fail this quickly in pathological cases and
* plenty large to keep the overhead minimal.
*/
protected static final int REPORT_EMPTY_EVERY = 10_000;
public static final int REPORT_EMPTY_EVERY = 10_000;

public InternalMultiBucketAggregation(String name, Map<String, Object> metadata) {
super(name, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorReducer;
Expand All @@ -23,7 +23,6 @@
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -33,6 +32,7 @@
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -323,85 +323,6 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}

private List<Bucket> reduceBuckets(List<InternalDateHistogram> aggregations, AggregationReduceContext reduceContext) {

final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return a.current().key < b.current().key;
}
};
for (InternalDateHistogram histogram : aggregations) {
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
}
}

int consumeBucketCount = 0;
List<Bucket> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current().key;

do {
final IteratorAndCurrent<Bucket> top = pq.top();

if (top.current().key != key) {
// the key changes, reduce what we already buffered and reset the buffer for current buckets
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
reducedBuckets.add(reduced);
}
currentBuckets.clear();
key = top.current().key;
}

currentBuckets.add(top.current());

if (top.hasNext()) {
top.next();
assert top.current().key > key : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
}
} while (pq.size() > 0);

if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
}
}
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
return reducedBuckets;
}

/**
* Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This
* requires all buckets to have the same key.
*/
private Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.isEmpty() == false;
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
}
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0).key, docCount, aggs);
}

private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
Expand Down Expand Up @@ -513,17 +434,30 @@ private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, L
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
return new AggregatorReducer() {

final List<InternalDateHistogram> aggregations = new ArrayList<>(size);
final LongKeyedMultiBucketsAggregatorReducer<Bucket> reducer = new LongKeyedMultiBucketsAggregatorReducer<>(
reduceContext,
size,
minDocCount
) {
@Override
protected Bucket createBucket(long key, long docCount, InternalAggregations aggregations) {
return InternalDateHistogram.this.createBucket(key, docCount, aggregations);
}
};

@Override
public void accept(InternalAggregation aggregation) {
aggregations.add((InternalDateHistogram) aggregation);
InternalDateHistogram dateHistogram = (InternalDateHistogram) aggregation;
for (Bucket bucket : dateHistogram.buckets) {
reducer.accept(bucket.key, bucket);
}
}

@Override
public InternalAggregation get() {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
List<Bucket> reducedBuckets = reducer.get();
if (reduceContext.isFinalReduce()) {
reducedBuckets.sort(Comparator.comparingLong(b -> b.key));
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
Expand Down Expand Up @@ -552,6 +486,11 @@ public InternalAggregation get() {
getMetadata()
);
}

@Override
public void close() {
Releasables.close(reducer);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorReducer;
Expand All @@ -20,14 +21,14 @@
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.InternalOrder;
import org.elasticsearch.search.aggregations.KeyComparable;
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand Down Expand Up @@ -281,83 +282,6 @@ public Bucket createBucket(InternalAggregations aggregations, Bucket prototype)
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}

private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
return Double.compare(a.current().key, b.current().key) < 0;
}
};
for (InternalAggregation aggregation : aggregations) {
InternalHistogram histogram = (InternalHistogram) aggregation;
if (histogram.buckets.isEmpty() == false) {
pq.add(new IteratorAndCurrent<>(histogram.buckets.iterator()));
}
}

int consumeBucketCount = 0;
List<Bucket> reducedBuckets = new ArrayList<>();
if (pq.size() > 0) {
// list of buckets coming from different shards that have the same key
List<Bucket> currentBuckets = new ArrayList<>();
double key = pq.top().current().key;

do {
final IteratorAndCurrent<Bucket> top = pq.top();

if (Double.compare(top.current().key, key) != 0) {
// The key changes, reduce what we already buffered and reset the buffer for current buckets.
// Using Double.compare instead of != to handle NaN correctly.
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
currentBuckets.clear();
key = top.current().key;
}

currentBuckets.add(top.current());

if (top.hasNext()) {
top.next();
assert Double.compare(top.current().key, key) > 0 : "shards must return data sorted by key";
pq.updateTop();
} else {
pq.pop();
}
} while (pq.size() > 0);

if (currentBuckets.isEmpty() == false) {
final Bucket reduced = reduceBucket(currentBuckets, reduceContext);
if (reduced.getDocCount() >= minDocCount || reduceContext.isFinalReduce() == false) {
reducedBuckets.add(reduced);
if (consumeBucketCount++ >= REPORT_EMPTY_EVERY) {
reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
consumeBucketCount = 0;
}
}
}
}

reduceContext.consumeBucketsAndMaybeBreak(consumeBucketCount);
return reducedBuckets;
}

private Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.isEmpty() == false;
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
}
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0).key, docCount, aggs);
}

private double nextKey(double key) {
return round(key + emptyBucketInfo.interval + emptyBucketInfo.interval / 2);
}
Expand Down Expand Up @@ -453,17 +377,30 @@ private void iterateEmptyBuckets(List<Bucket> list, ListIterator<Bucket> iter, D
protected AggregatorReducer getLeaderReducer(AggregationReduceContext reduceContext, int size) {
return new AggregatorReducer() {

List<InternalAggregation> aggregations = new ArrayList<>(size);
final LongKeyedMultiBucketsAggregatorReducer<Bucket> reducer = new LongKeyedMultiBucketsAggregatorReducer<>(
reduceContext,
size,
minDocCount
) {
@Override
protected Bucket createBucket(long key, long docCount, InternalAggregations aggregations) {
return InternalHistogram.this.createBucket(NumericUtils.sortableLongToDouble(key), docCount, aggregations);
}
};

@Override
public void accept(InternalAggregation aggregation) {
aggregations.add(aggregation);
InternalHistogram histogram = (InternalHistogram) aggregation;
for (Bucket bucket : histogram.buckets) {
reducer.accept(NumericUtils.doubleToSortableLong(bucket.key), bucket);
}
}

@Override
public InternalAggregation get() {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
List<Bucket> reducedBuckets = reducer.get();
if (reduceContext.isFinalReduce()) {
reducedBuckets.sort(Comparator.comparingDouble(b -> b.key));
if (minDocCount == 0) {
addEmptyBuckets(reducedBuckets, reduceContext);
}
Expand All @@ -481,6 +418,11 @@ public InternalAggregation get() {
}
return new InternalHistogram(getName(), reducedBuckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
}

@Override
public void close() {
Releasables.close(reducer);
}
};
}

Expand Down
Loading

0 comments on commit f942605

Please sign in to comment.