Skip to content

Commit

Permalink
Simplify SiblingPipelineAggregator (#53144) (#53341)
Browse files Browse the repository at this point in the history
This removes the `instanceof`s from `SiblingPipelineAggregator` by
adding a `rewriteBuckets` method to `InternalAggregation` that can be
called to, well, rewrite the buckets. The default implementation of
`rewriteBuckets` throws the same exception that was thrown when you
attempted to run a `SiblingPipelineAggregator` on an aggregation without
buckets. It is overridden by `InternalSingleBucketAggregation` and
`InternalMultiBucketAggregation` to correctly rewrite their buckets.
  • Loading branch information
nik9000 authored Mar 10, 2020
1 parent 89c0e1f commit 5ce6de2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.IntConsumer;

/**
Expand Down Expand Up @@ -127,6 +128,29 @@ public String getName() {
return name;
}

/**
* Rewrite the sub-aggregations in the buckets in this aggregation.
* Returns a copy of this {@linkplain InternalAggregation} with the
* rewritten buckets, or, if there aren't any modifications to
* the buckets then this method will return this aggregation. Either
* way, it doesn't modify this aggregation.
* <p>
* Implementers of this should call the {@code rewriter} once per bucket
* with its {@linkplain InternalAggregations}. The {@code rewriter}
* should return {@code null} if it doen't have any rewriting to do or
* it should return a new {@linkplain InternalAggregations} to make
* changs.
* <p>
* The default implementation throws an exception because most
* aggregations don't <strong>have</strong> buckets in them. It
* should be overridden by aggregations that contain buckets. Implementers
* should respect the description above.
*/
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
throw new IllegalStateException(
"Aggregation [" + getName() + "] must be a bucket aggregation but was [" + getWriteableName() + "]");
}

/**
* Creates the output from all pipeline aggs that this aggregation is associated with. Should only
* be called after all aggregations have been fully reduced
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

/**
* Make a mutable copy of the aggregation results.
* <p>
* IMPORTANT: The copy doesn't include any pipeline aggregations, if there are any.
*/
public List<InternalAggregation> copyResults() {
return new ArrayList<>(getInternalAggregations());
}

/**
* Returns the top-level pipeline aggregators.
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation,
B extends InternalMultiBucketAggregation.InternalBucket>
Expand Down Expand Up @@ -154,6 +155,23 @@ public final InternalAggregation reducePipelines(InternalAggregation reducedAggs
return super.reducePipelines(create(materializedBuckets), reduceContext);
}

@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
boolean modified = false;
List<B> newBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
InternalAggregations rewritten = rewriter.apply((InternalAggregations) bucket.getAggregations());
if (rewritten == null) {
newBuckets.add(bucket);
continue;
}
modified = true;
B newBucket = createBucket(rewritten, bucket);
newBuckets.add(newBucket);
}
return modified ? create(newBuckets) : this;
}

private List<B> reducePipelineBuckets(ReduceContext reduceContext) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
Expand Down Expand Up @@ -192,6 +210,5 @@ public Object getProperty(String containingAggName, List<String> path) {
}
return aggregation.getProperty(path.subList(1, path.size()));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

/**
* A base class for all the single bucket aggregations.
Expand Down Expand Up @@ -169,6 +170,15 @@ public final double sortValue(AggregationPath.PathElement head, Iterator<Aggrega
return aggregations.sortValue(head, tail);
}

@Override
public InternalAggregation copyWithRewritenBuckets(Function<InternalAggregations, InternalAggregations> rewriter) {
InternalAggregations rewritten = rewriter.apply(aggregations);
if (rewritten == null) {
return this;
}
return create(rewritten);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
Expand Down Expand Up @@ -84,7 +83,7 @@ public long getDocCount() {
}

@Override
public Aggregations getAggregations() {
public InternalAggregations getAggregations() {
return aggregations;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,10 @@
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public abstract class SiblingPipelineAggregator extends PipelineAggregator {
protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
Expand All @@ -47,39 +41,13 @@ protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<Stri
super(in);
}

@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
if (aggregation instanceof InternalMultiBucketAggregation) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation multiBucketsAgg = (InternalMultiBucketAggregation) aggregation;
List<? extends Bucket> buckets = multiBucketsAgg.getBuckets();
List<Bucket> newBuckets = new ArrayList<>();
for (Bucket bucket1 : buckets) {
InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) bucket1;
InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext);
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(aggToAdd);
InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs),
bucket);
newBuckets.add(newBucket);
}

return multiBucketsAgg.create(newBuckets);
} else if (aggregation instanceof InternalSingleBucketAggregation) {
InternalSingleBucketAggregation singleBucketAgg = (InternalSingleBucketAggregation) aggregation;
InternalAggregation aggToAdd = doReduce(singleBucketAgg.getAggregations(), reduceContext);
List<InternalAggregation> aggs = StreamSupport.stream(singleBucketAgg.getAggregations().spliterator(), false)
.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(aggToAdd);
return singleBucketAgg.create(new InternalAggregations(aggs));
} else {
throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation ["
+ aggregation.getWriteableName() + "]");
}
return aggregation.copyWithRewritenBuckets(aggregations -> {
List<InternalAggregation> aggs = aggregations.copyResults();
aggs.add(doReduce(aggregations, reduceContext));
return new InternalAggregations(aggs);
});
}

public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);
Expand Down

0 comments on commit 5ce6de2

Please sign in to comment.