diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java index 021d6872e3325..77833c98cae0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregation.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.function.IntConsumer; /** @@ -127,6 +128,29 @@ public String getName() { return name; } + /** + * Rewrite the sub-aggregations in the buckets in this aggregation. + * Returns a copy of this {@linkplain InternalAggregation} with the + * rewritten buckets, or, if there aren't any modifications to + * the buckets then this method will return this aggregation. Either + * way, it doesn't modify this aggregation. + *

+ * Implementers of this should call the {@code rewriter} once per bucket + * with its {@linkplain InternalAggregations}. The {@code rewriter} + * should return {@code null} if it doen't have any rewriting to do or + * it should return a new {@linkplain InternalAggregations} to make + * changs. + *

+ * The default implementation throws an exception because most + * aggregations don't have buckets in them. It + * should be overridden by aggregations that contain buckets. Implementers + * should respect the description above. + */ + public InternalAggregation copyWithRewritenBuckets(Function rewriter) { + throw new IllegalStateException( + "Aggregation [" + getName() + "] must be a bucket aggregation but was [" + getWriteableName() + "]"); + } + /** * Creates the output from all pipeline aggs that this aggregation is associated with. Should only * be called after all aggregations have been fully reduced diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 344b86dad7484..b32fa9df82944 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -92,6 +92,15 @@ public void writeTo(StreamOutput out) throws IOException { } } + /** + * Make a mutable copy of the aggregation results. + *

+ * IMPORTANT: The copy doesn't include any pipeline aggregations, if there are any. + */ + public List copyResults() { + return new ArrayList<>(getInternalAggregations()); + } + /** * Returns the top-level pipeline aggregators. * Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index 9600c6ae86611..61176fbf29f82 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; public abstract class InternalMultiBucketAggregation @@ -154,6 +155,23 @@ public final InternalAggregation reducePipelines(InternalAggregation reducedAggs return super.reducePipelines(create(materializedBuckets), reduceContext); } + @Override + public InternalAggregation copyWithRewritenBuckets(Function rewriter) { + boolean modified = false; + List newBuckets = new ArrayList<>(); + for (B bucket : getBuckets()) { + InternalAggregations rewritten = rewriter.apply((InternalAggregations) bucket.getAggregations()); + if (rewritten == null) { + newBuckets.add(bucket); + continue; + } + modified = true; + B newBucket = createBucket(rewritten, bucket); + newBuckets.add(newBucket); + } + return modified ? create(newBuckets) : this; + } + private List reducePipelineBuckets(ReduceContext reduceContext) { List reducedBuckets = new ArrayList<>(); for (B bucket : getBuckets()) { @@ -192,6 +210,5 @@ public Object getProperty(String containingAggName, List path) { } return aggregation.getProperty(path.subList(1, path.size())); } - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java index 68a66318b4511..437cd1466d688 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/InternalSingleBucketAggregation.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; /** * A base class for all the single bucket aggregations. @@ -169,6 +170,15 @@ public final double sortValue(AggregationPath.PathElement head, Iterator rewriter) { + InternalAggregations rewritten = rewriter.apply(aggregations); + if (rewritten == null) { + return this; + } + return create(rewritten); + } + @Override public boolean equals(Object obj) { if (this == obj) return true; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index 78181e3a3366a..801438bc84f7f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; @@ -84,7 +83,7 @@ public long getDocCount() { } @Override - public Aggregations getAggregations() { + public InternalAggregations getAggregations() { return aggregations; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index a131dde9c7673..ba7c49cee7eb4 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -24,16 +24,10 @@ import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; -import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; -import org.elasticsearch.search.aggregations.bucket.InternalSingleBucketAggregation; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public abstract class SiblingPipelineAggregator extends PipelineAggregator { protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map metaData) { @@ -47,39 +41,13 @@ protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map buckets = multiBucketsAgg.getBuckets(); - List newBuckets = new ArrayList<>(); - for (Bucket bucket1 : buckets) { - InternalMultiBucketAggregation.InternalBucket bucket = (InternalMultiBucketAggregation.InternalBucket) bucket1; - InternalAggregation aggToAdd = doReduce(bucket.getAggregations(), reduceContext); - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - aggs.add(aggToAdd); - InternalMultiBucketAggregation.InternalBucket newBucket = multiBucketsAgg.createBucket(new InternalAggregations(aggs), - bucket); - newBuckets.add(newBucket); - } - - return multiBucketsAgg.create(newBuckets); - } else if (aggregation instanceof InternalSingleBucketAggregation) { - InternalSingleBucketAggregation singleBucketAgg = (InternalSingleBucketAggregation) aggregation; - InternalAggregation aggToAdd = doReduce(singleBucketAgg.getAggregations(), reduceContext); - List aggs = StreamSupport.stream(singleBucketAgg.getAggregations().spliterator(), false) - .map((p) -> (InternalAggregation) p) - .collect(Collectors.toList()); - aggs.add(aggToAdd); - return singleBucketAgg.create(new InternalAggregations(aggs)); - } else { - throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation [" - + aggregation.getWriteableName() + "]"); - } + return aggregation.copyWithRewritenBuckets(aggregations -> { + List aggs = aggregations.copyResults(); + aggs.add(doReduce(aggregations, reduceContext)); + return new InternalAggregations(aggs); + }); } public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);