diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketIT.java index 6922fa98755b6..a9e3c00e57e4d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/AvgBucketIT.java @@ -8,407 +8,43 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.avgBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.IsNull.notNullValue; - -@ESIntegTestCase.SuiteScopeTestCase -public class AvgBucketIT extends ESIntegTestCase { - - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; - - static String histoName; - static String termsName; +public class AvgBucketIT extends BucketMetricsPipeLineAggregationTestCase { @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); - histoName = randomName(); - termsName = randomName(); - } - - private static String randomName() { - return randomBoolean() - ? randomAlphaOfLengthBetween(3, 12) - : randomAlphaOfLengthBetween(3, 6) + "." + randomAlphaOfLengthBetween(3, 6); + protected AvgBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return avgBucket(name, bucketsPath); } - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram(histoName).field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(avgBucket("avg_bucket", histoName + ">_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get(histoName); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo(histoName)); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + InternalSimpleValue pipelineBucket + ) { double sum = 0; int count = 0; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + for (int i = 0; i < numBuckets; ++i) { count++; - sum += bucket.getDocCount(); + sum += bucketValues.applyAsDouble(i); } - double avgValue = count == 0 ? Double.NaN : (sum / count); - InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgValue)); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms(termsName).field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(avgBucket("avg_bucket", histoName + ">_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get(histoName); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo(histoName)); - List buckets = histo.getBuckets(); - - double sum = 0; - int count = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - count++; - sum += bucket.getDocCount(); - } - - double avgValue = count == 0 ? Double.NaN : (sum / count); - InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgValue)); - } + assertThat(pipelineBucket.value(), equalTo(avgValue)); } - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms(termsName).field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(avgBucket("avg_bucket", termsName + ">sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - - double bucketSum = 0; - int count = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgValue)); - } - - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms(termsName).field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(avgBucket("avg_bucket", histoName + ">sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get(histoName); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo(histoName)); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - } - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgValue)); - } - } - - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms(termsName).field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(avgBucket("avg_bucket", histoName + ">sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get(histoName); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo(histoName)); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - - count++; - bucketSum += sum.value(); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgValue)); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms(termsName).field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(avgBucket("avg_bucket", termsName + ">sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_bucket")); - assertThat(avgBucketValue.value(), equalTo(Double.NaN)); + @Override + protected String nestedMetric() { + return "value"; } - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms(termsName).field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(avgBucket("avg_histo_bucket", histoName + ">_count")) - ) - .addAggregation(avgBucket("avg_terms_bucket", termsName + ">avg_histo_bucket")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get(termsName); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo(termsName)); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - double aggTermsSum = 0; - int aggTermsCount = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get(histoName); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo(histoName)); - List buckets = histo.getBuckets(); - - double aggHistoSum = 0; - int aggHistoCount = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - - aggHistoCount++; - aggHistoSum += bucket.getDocCount(); - } - - double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount); - InternalSimpleValue avgBucketValue = termsBucket.getAggregations().get("avg_histo_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_histo_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgHistoValue)); - - aggTermsCount++; - aggTermsSum += avgHistoValue; - } - - double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount); - InternalSimpleValue avgBucketValue = response.getAggregations().get("avg_terms_bucket"); - assertThat(avgBucketValue, notNullValue()); - assertThat(avgBucketValue.getName(), equalTo("avg_terms_bucket")); - assertThat(avgBucketValue.value(), equalTo(avgTermsValue)); + @Override + protected double getNestedMetric(InternalSimpleValue bucket) { + return bucket.value(); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipeLineAggregationTestCase.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipeLineAggregationTestCase.java new file mode 100644 index 0000000000000..9384cca6599e1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/BucketMetricsPipeLineAggregationTestCase.java @@ -0,0 +1,494 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations.pipeline; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.search.aggregations.metrics.Sum; +import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; +import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; +import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.core.IsNull.notNullValue; + +@ESIntegTestCase.SuiteScopeTestCase +abstract class BucketMetricsPipeLineAggregationTestCase extends ESIntegTestCase { + + static final String SINGLE_VALUED_FIELD_NAME = "l_value"; + + static int numDocs; + static int interval; + static int minRandomValue; + static int maxRandomValue; + static int numValueBuckets; + static long[] valueCounts; + + static String histoName; + static String termsName; + + /** Creates the pipeline aggregation to test */ + protected abstract BucketMetricsPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath); + + /** Checks that the provided bucket values and keys agree with the result of the pipeline aggregation */ + protected abstract void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numValues, + T pipelineBucket + ); + + /** Nested metric from the pipeline aggregation to test. This metric is added to the end of the bucket path*/ + protected abstract String nestedMetric(); + + /** Extract the value of the nested metric provided in {@link #nestedMetric()} */ + protected abstract double getNestedMetric(T bucket); + + @Override + public void setupSuiteScopeCluster() throws Exception { + assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); + createIndex("idx_unmapped"); + + numDocs = randomIntBetween(6, 20); + interval = randomIntBetween(2, 5); + + minRandomValue = 0; + maxRandomValue = 20; + + numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; + valueCounts = new long[numValueBuckets]; + + List builders = new ArrayList<>(); + + for (int i = 0; i < numDocs; i++) { + int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); + builders.add( + client().prepareIndex("idx", "type") + .setSource( + jsonBuilder().startObject() + .field(SINGLE_VALUED_FIELD_NAME, fieldValue) + .field("tag", "tag" + (i % interval)) + .endObject() + ) + ); + final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); + valueCounts[bucket]++; + } + + assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); + for (int i = 0; i < 2; i++) { + builders.add( + client().prepareIndex("empty_bucket_idx", "type", "" + i) + .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) + ); + } + indexRandom(true, builders); + ensureSearchable(); + histoName = randomName(); + termsName = randomName(); + } + + private String randomName() { + return randomBoolean() + ? randomAlphaOfLengthBetween(3, 12) + : randomAlphaOfLengthBetween(3, 6) + "." + randomAlphaOfLengthBetween(3, 6); + } + + public void testDocCountTopLevel() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) + ) + .addAggregation(BucketMetricsPipelineAgg("pipeline_agg", histoName + ">_count")) + .get(); + + assertSearchResponse(response); + + Histogram histo = response.getAggregations().get(histoName); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo(histoName)); + List buckets = histo.getBuckets(); + assertThat(buckets.size(), equalTo(numValueBuckets)); + + for (int i = 0; i < numValueBuckets; ++i) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); + assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + } + + T pipelineBucket = response.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + assertResult((i) -> buckets.get(i).getDocCount(), (i) -> buckets.get(i).getKeyAsString(), numValueBuckets, pipelineBucket); + } + + public void testDocCountAsSubAgg() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + terms(termsName).field("tag") + .order(BucketOrder.key(true)) + .subAggregation( + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) + .interval(interval) + .extendedBounds(minRandomValue, maxRandomValue) + ) + .subAggregation(BucketMetricsPipelineAgg("pipeline_agg", histoName + ">_count")) + ) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get(histoName); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo(histoName)); + List buckets = histo.getBuckets(); + + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + } + + T pipelineBucket = termsBucket.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + assertResult((k) -> buckets.get(k).getDocCount(), (k) -> buckets.get(k).getKeyAsString(), numValueBuckets, pipelineBucket); + } + } + + public void testMetricTopLevel() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation(terms(termsName).field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(BucketMetricsPipelineAgg("pipeline_agg", termsName + ">sum")) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket bucket = buckets.get(i); + assertThat(bucket, notNullValue()); + assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); + assertThat(bucket.getDocCount(), greaterThan(0L)); + } + + T pipelineBucket = response.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + IntToDoubleFunction function = (i) -> { + Sum sum = buckets.get(i).getAggregations().get("sum"); + assertThat(sum, notNullValue()); + return sum.value(); + }; + assertResult(function, (i) -> buckets.get(i).getKeyAsString(), interval, pipelineBucket); + } + + public void testMetricAsSubAgg() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + terms(termsName).field("tag") + .order(BucketOrder.key(true)) + .subAggregation( + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) + .interval(interval) + .extendedBounds(minRandomValue, maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) + ) + .subAggregation(BucketMetricsPipelineAgg("pipeline_agg", histoName + ">sum")) + ) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get(histoName); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo(histoName)); + List buckets = histo.getBuckets(); + + List notNullBuckets = new ArrayList<>(); + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + if (bucket.getDocCount() != 0) { + notNullBuckets.add(bucket); + } + } + + T pipelineBucket = termsBucket.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + IntToDoubleFunction function = (k) -> { + Sum sum = notNullBuckets.get(k).getAggregations().get("sum"); + assertThat(sum, notNullValue()); + return sum.value(); + }; + assertResult(function, (k) -> notNullBuckets.get(k).getKeyAsString(), notNullBuckets.size(), pipelineBucket); + } + } + + public void testMetricAsSubAggWithInsertZeros() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + terms(termsName).field("tag") + .order(BucketOrder.key(true)) + .subAggregation( + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) + .interval(interval) + .extendedBounds(minRandomValue, maxRandomValue) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) + ) + .subAggregation( + BucketMetricsPipelineAgg("pipeline_agg", histoName + ">sum").gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS) + ) + ) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get(histoName); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo(histoName)); + List buckets = histo.getBuckets(); + + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + } + + T pipelineBucket = termsBucket.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + IntToDoubleFunction function = (k) -> { + Sum sum = buckets.get(k).getAggregations().get("sum"); + assertThat(sum, notNullValue()); + return sum.value(); + }; + assertResult(function, (k) -> buckets.get(k).getKeyAsString(), numValueBuckets, pipelineBucket); + } + } + + public void testNoBuckets() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + terms(termsName).field("tag") + .includeExclude(new IncludeExclude(null, "tag.*")) + .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) + ) + .addAggregation(BucketMetricsPipelineAgg("pipeline_agg", termsName + ">sum")) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List buckets = terms.getBuckets(); + assertThat(buckets.size(), equalTo(0)); + + T pipelineBucket = response.getAggregations().get("pipeline_agg"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("pipeline_agg")); + + assertResult((k) -> 0.0, (k) -> "", 0, pipelineBucket); + } + + public void testNested() { + SearchResponse response = client().prepareSearch("idx") + .addAggregation( + terms(termsName).field("tag") + .order(BucketOrder.key(true)) + .subAggregation( + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) + .interval(interval) + .extendedBounds(minRandomValue, maxRandomValue) + ) + .subAggregation(BucketMetricsPipelineAgg("nested_histo_bucket", histoName + ">_count")) + ) + .addAggregation(BucketMetricsPipelineAgg("nested_terms_bucket", termsName + ">nested_histo_bucket." + nestedMetric())) + .get(); + + assertSearchResponse(response); + + Terms terms = response.getAggregations().get(termsName); + assertThat(terms, notNullValue()); + assertThat(terms.getName(), equalTo(termsName)); + List termsBuckets = terms.getBuckets(); + assertThat(termsBuckets.size(), equalTo(interval)); + + List allBuckets = new ArrayList<>(); + List nestedTags = new ArrayList<>(); + for (int i = 0; i < interval; ++i) { + Terms.Bucket termsBucket = termsBuckets.get(i); + assertThat(termsBucket, notNullValue()); + assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); + + Histogram histo = termsBucket.getAggregations().get(histoName); + assertThat(histo, notNullValue()); + assertThat(histo.getName(), equalTo(histoName)); + List buckets = histo.getBuckets(); + + for (int j = 0; j < numValueBuckets; ++j) { + Histogram.Bucket bucket = buckets.get(j); + assertThat(bucket, notNullValue()); + assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); + } + + T pipelineBucket = termsBucket.getAggregations().get("nested_histo_bucket"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("nested_histo_bucket")); + + assertResult((k) -> buckets.get(k).getDocCount(), (k) -> buckets.get(k).getKeyAsString(), numValueBuckets, pipelineBucket); + allBuckets.add(pipelineBucket); + nestedTags.add(termsBucket.getKeyAsString()); + } + + T pipelineBucket = response.getAggregations().get("nested_terms_bucket"); + assertThat(pipelineBucket, notNullValue()); + assertThat(pipelineBucket.getName(), equalTo("nested_terms_bucket")); + + assertResult((k) -> getNestedMetric(allBuckets.get(k)), (k) -> nestedTags.get(k), allBuckets.size(), pipelineBucket); + } + + /** + * https://github.com/elastic/elasticsearch/issues/33514 + * + * This bug manifests as the max_bucket agg ("peak") being added to the response twice, because + * the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps. + * The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms + * delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then + * execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values. + */ + public void testFieldIsntWrittenOutTwice() throws Exception { + // you need to add an additional index with no fields in order to trigger this (or potentially a shard) + // so that there is an UnmappedTerms in the list to reduce. + createIndex("foo_1"); + // @formatter:off + XContentBuilder builder = jsonBuilder().startObject() + .startObject("properties") + .startObject("@timestamp") + .field("type", "date") + .endObject() + .startObject("license") + .startObject("properties") + .startObject("count") + .field("type", "long") + .endObject() + .startObject("partnumber") + .field("type", "text") + .startObject("fields") + .startObject("keyword") + .field("type", "keyword") + .field("ignore_above", 256) + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + .endObject(); + // @formatter:on + assertAcked(client().admin().indices().prepareCreate("foo_2").addMapping("doc", builder).get()); + // @formatter:off + XContentBuilder docBuilder = jsonBuilder().startObject() + .startObject("license") + .field("partnumber", "foobar") + .field("count", 2) + .endObject() + .field("@timestamp", "2018-07-08T08:07:00.599Z") + .endObject(); + // @formatter:on + client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + client().admin().indices().prepareRefresh(); + + TermsAggregationBuilder groupByLicenseAgg = terms("group_by_license_partnumber").field("license.partnumber.keyword"); + + SumAggregationBuilder sumAggBuilder = sum("total_licenses").field("license.count"); + DateHistogramAggregationBuilder licensePerDayBuilder = dateHistogram("licenses_per_day").field("@timestamp") + .fixedInterval(DateHistogramInterval.DAY); + licensePerDayBuilder.subAggregation(sumAggBuilder); + groupByLicenseAgg.subAggregation(licensePerDayBuilder); + groupByLicenseAgg.subAggregation(BucketMetricsPipelineAgg("peak", "licenses_per_day>total_licenses")); + + SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get(); + BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false); + XContentHelper.convertToMap(bytes, false, XContentType.JSON); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java index 2036df15f9a37..6f42ab6a89bfd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/ExtendedStatsBucketIT.java @@ -15,70 +15,72 @@ import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.ExtendedStats.Bounds; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.extendedStatsBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.IsNull.notNullValue; -@ESIntegTestCase.SuiteScopeTestCase -public class ExtendedStatsBucketIT extends ESIntegTestCase { +public class ExtendedStatsBucketIT extends BucketMetricsPipeLineAggregationTestCase { - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; + @Override + protected ExtendedStatsBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return extendedStatsBucket(name, bucketsPath); + } @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped", "idx_gappy"); + protected void assertResult( + IntToDoubleFunction buckets, + Function bucketKeys, + int numBuckets, + ExtendedStatsBucket pipelineBucket + ) { + double sum = 0; + int count = 0; + double min = Double.POSITIVE_INFINITY; + double max = Double.NEGATIVE_INFINITY; + double sumOfSquares = 0; + for (int i = 0; i < numBuckets; ++i) { + double bucketValue = buckets.applyAsDouble(i); + count++; + sum += bucketValue; + min = Math.min(min, bucketValue); + max = Math.max(max, bucketValue); + sumOfSquares += bucketValue * bucketValue; + } + double avgValue = count == 0 ? Double.NaN : (sum / count); + assertThat(pipelineBucket.getAvg(), equalTo(avgValue)); + assertThat(pipelineBucket.getMin(), equalTo(min)); + assertThat(pipelineBucket.getMax(), equalTo(max)); + assertThat(pipelineBucket.getSumOfSquares(), equalTo(sumOfSquares)); + } - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); + @Override + protected String nestedMetric() { + return "avg"; + } - minRandomValue = 0; - maxRandomValue = 20; + @Override + protected double getNestedMetric(ExtendedStatsBucket bucket) { + return bucket.getAvg(); + } - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; + @Override + public void setupSuiteScopeCluster() throws Exception { + super.setupSuiteScopeCluster(); List builders = new ArrayList<>(); - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - for (int i = 0; i < 6; i++) { // creates 6 documents where the value of the field is 0, 1, 2, 3, // 3, 5 @@ -88,13 +90,6 @@ public void setupSuiteScopeCluster() throws Exception { ); } - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } indexRandom(true, builders); ensureSearchable(); } @@ -152,300 +147,6 @@ public void testGappyIndexWithSigma() { assertThat(extendedStatsBucketValue.getStdDeviationBound(Bounds.UPPER), equalTo(avg + (sigma * stdDev))); } - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(extendedStatsBucket("extended_stats_bucket", "histo>_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - - double sum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); - count++; - sum += bucket.getDocCount(); - min = Math.min(min, bucket.getDocCount()); - max = Math.max(max, bucket.getDocCount()); - sumOfSquares += bucket.getDocCount() * bucket.getDocCount(); - } - - double avgValue = count == 0 ? Double.NaN : (sum / count); - ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(extendedStatsBucket("extended_stats_bucket", "histo>_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double sum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - count++; - sum += bucket.getDocCount(); - min = Math.min(min, bucket.getDocCount()); - max = Math.max(max, bucket.getDocCount()); - sumOfSquares += bucket.getDocCount() * bucket.getDocCount(); - } - - double avgValue = count == 0 ? Double.NaN : (sum / count); - ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } - } - - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(extendedStatsBucket("extended_stats_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - sumOfSquares += sum.value() * sum.value(); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } - - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(extendedStatsBucket("extended_stats_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - sumOfSquares += sum.value() * sum.value(); - } - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } - } - - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(extendedStatsBucket("extended_stats_bucket", "histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - sumOfSquares += sum.value() * sum.value(); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(extendedStatsBucket("extended_stats_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("extended_stats_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("extended_stats_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(Double.NaN)); - } - public void testBadSigmaAsSubAgg() throws Exception { Exception ex = expectThrows( Exception.class, @@ -476,74 +177,4 @@ public void testBadSigmaAsSubAgg() throws Exception { throw ex; } } - - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(extendedStatsBucket("avg_histo_bucket", "histo>_count")) - ) - .addAggregation(extendedStatsBucket("avg_terms_bucket", "terms>avg_histo_bucket.avg")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - double aggTermsSum = 0; - int aggTermsCount = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - double sumOfSquares = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double aggHistoSum = 0; - int aggHistoCount = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - - aggHistoCount++; - aggHistoSum += bucket.getDocCount(); - } - - double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount); - ExtendedStatsBucket extendedStatsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("avg_histo_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgHistoValue)); - - aggTermsCount++; - aggTermsSum += avgHistoValue; - min = Math.min(min, avgHistoValue); - max = Math.max(max, avgHistoValue); - sumOfSquares += avgHistoValue * avgHistoValue; - } - - double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount); - ExtendedStatsBucket extendedStatsBucketValue = response.getAggregations().get("avg_terms_bucket"); - assertThat(extendedStatsBucketValue, notNullValue()); - assertThat(extendedStatsBucketValue.getName(), equalTo("avg_terms_bucket")); - assertThat(extendedStatsBucketValue.getAvg(), equalTo(avgTermsValue)); - assertThat(extendedStatsBucketValue.getMin(), equalTo(min)); - assertThat(extendedStatsBucketValue.getMax(), equalTo(max)); - assertThat(extendedStatsBucketValue.getSumOfSquares(), equalTo(sumOfSquares)); - } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java index 3a7ec7061b681..4e7e0fba0f0eb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MaxBucketIT.java @@ -8,559 +8,51 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.PipelineAggregatorBuilders; -import org.elasticsearch.search.aggregations.bucket.filter.Filter; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.metrics.SumAggregationBuilder; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; - import java.util.ArrayList; import java.util.List; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.index.query.QueryBuilders.termQuery; -import static org.elasticsearch.search.aggregations.AggregationBuilders.filter; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.maxBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.IsNull.notNullValue; - -@ESIntegTestCase.SuiteScopeTestCase -public class MaxBucketIT extends ESIntegTestCase { - - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; +public class MaxBucketIT extends BucketMetricsPipeLineAggregationTestCase { @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); + protected MaxBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return maxBucket(name, bucketsPath); } - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(maxBucket("max_bucket", "histo>_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - - List maxKeys = new ArrayList<>(); - double maxValue = Double.NEGATIVE_INFINITY; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); - if (bucket.getDocCount() > maxValue) { - maxValue = bucket.getDocCount(); - maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = response.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(maxBucket("max_bucket", "histo>_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List maxKeys = new ArrayList<>(); - double maxValue = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() > maxValue) { - maxValue = bucket.getDocCount(); - maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = termsBucket.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); - } - } - - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(maxBucket("max_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + InternalBucketMetricValue pipelineBucket + ) { List maxKeys = new ArrayList<>(); double maxValue = Double.NEGATIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() > maxValue) { - maxValue = sum.value(); + for (int i = 0; i < numBuckets; ++i) { + double bucketValue = bucketValues.applyAsDouble(i); + if (bucketValue > maxValue) { + maxValue = bucketValue; maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = response.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); - } - - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(maxBucket("max_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List maxKeys = new ArrayList<>(); - double maxValue = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() > maxValue) { - maxValue = sum.value(); - maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } - } - } - - InternalBucketMetricValue maxBucketValue = termsBucket.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); - } - } - - public void testMetricAsSubAggOfSingleBucketAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - filter("filter", termQuery("tag", "tag0")).subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ).subAggregation(maxBucket("max_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Filter filter = response.getAggregations().get("filter"); - assertThat(filter, notNullValue()); - assertThat(filter.getName(), equalTo("filter")); - Histogram histo = filter.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List maxKeys = new ArrayList<>(); - double maxValue = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() > maxValue) { - maxValue = sum.value(); - maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } + maxKeys.add(bucketKeys.apply(i)); + } else if (bucketValue == maxValue) { + maxKeys.add(bucketKeys.apply(i)); } } - - InternalBucketMetricValue maxBucketValue = filter.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); + assertThat(pipelineBucket.value(), equalTo(maxValue)); + assertThat(pipelineBucket.keys(), equalTo(maxKeys.toArray(new String[0]))); } - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(maxBucket("max_bucket", "histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List maxKeys = new ArrayList<>(); - double maxValue = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() > maxValue) { - maxValue = sum.value(); - maxKeys = new ArrayList<>(); - maxKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == maxValue) { - maxKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = termsBucket.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxValue)); - assertThat(maxBucketValue.keys(), equalTo(maxKeys.toArray(new String[maxKeys.size()]))); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(maxBucket("max_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - InternalBucketMetricValue maxBucketValue = response.getAggregations().get("max_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_bucket")); - assertThat(maxBucketValue.value(), equalTo(Double.NEGATIVE_INFINITY)); - assertThat(maxBucketValue.keys(), equalTo(new String[0])); - } - - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(maxBucket("max_histo_bucket", "histo>_count")) - ) - .addAggregation(maxBucket("max_terms_bucket", "terms>max_histo_bucket")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - List maxTermsKeys = new ArrayList<>(); - double maxTermsValue = Double.NEGATIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List maxHistoKeys = new ArrayList<>(); - double maxHistoValue = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() > maxHistoValue) { - maxHistoValue = bucket.getDocCount(); - maxHistoKeys = new ArrayList<>(); - maxHistoKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == maxHistoValue) { - maxHistoKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = termsBucket.getAggregations().get("max_histo_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_histo_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxHistoValue)); - assertThat(maxBucketValue.keys(), equalTo(maxHistoKeys.toArray(new String[maxHistoKeys.size()]))); - if (maxHistoValue > maxTermsValue) { - maxTermsValue = maxHistoValue; - maxTermsKeys = new ArrayList<>(); - maxTermsKeys.add(termsBucket.getKeyAsString()); - } else if (maxHistoValue == maxTermsValue) { - maxTermsKeys.add(termsBucket.getKeyAsString()); - } - } - - InternalBucketMetricValue maxBucketValue = response.getAggregations().get("max_terms_bucket"); - assertThat(maxBucketValue, notNullValue()); - assertThat(maxBucketValue.getName(), equalTo("max_terms_bucket")); - assertThat(maxBucketValue.value(), equalTo(maxTermsValue)); - assertThat(maxBucketValue.keys(), equalTo(maxTermsKeys.toArray(new String[maxTermsKeys.size()]))); + @Override + protected String nestedMetric() { + return "value"; } - /** - * https://github.com/elastic/elasticsearch/issues/33514 - * - * This bug manifests as the max_bucket agg ("peak") being added to the response twice, because - * the pipeline agg is run twice. This makes invalid JSON and breaks conversion to maps. - * The bug was caused by an UnmappedTerms being the chosen as the first reduction target. UnmappedTerms - * delegated reduction to the first non-unmapped agg, which would reduce and run pipeline aggs. But then - * execution returns to the UnmappedTerms and _it_ runs pipelines as well, doubling up on the values. - * - * Applies to any pipeline agg, not just max. - */ - public void testFieldIsntWrittenOutTwice() throws Exception { - // you need to add an additional index with no fields in order to trigger this (or potentially a shard) - // so that there is an UnmappedTerms in the list to reduce. - createIndex("foo_1"); - - XContentBuilder builder = jsonBuilder().startObject() - .startObject("properties") - .startObject("@timestamp") - .field("type", "date") - .endObject() - .startObject("license") - .startObject("properties") - .startObject("count") - .field("type", "long") - .endObject() - .startObject("partnumber") - .field("type", "text") - .startObject("fields") - .startObject("keyword") - .field("type", "keyword") - .field("ignore_above", 256) - .endObject() - .endObject() - .endObject() - .endObject() - .endObject() - .endObject() - .endObject(); - assertAcked(client().admin().indices().prepareCreate("foo_2").addMapping("doc", builder).get()); - - XContentBuilder docBuilder = jsonBuilder().startObject() - .startObject("license") - .field("partnumber", "foobar") - .field("count", 2) - .endObject() - .field("@timestamp", "2018-07-08T08:07:00.599Z") - .endObject(); - - client().prepareIndex("foo_2", "doc").setSource(docBuilder).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - - client().admin().indices().prepareRefresh(); - - TermsAggregationBuilder groupByLicenseAgg = AggregationBuilders.terms("group_by_license_partnumber") - .field("license.partnumber.keyword"); - MaxBucketPipelineAggregationBuilder peakPipelineAggBuilder = PipelineAggregatorBuilders.maxBucket( - "peak", - "licenses_per_day>total_licenses" - ); - SumAggregationBuilder sumAggBuilder = AggregationBuilders.sum("total_licenses").field("license.count"); - DateHistogramAggregationBuilder licensePerDayBuilder = AggregationBuilders.dateHistogram("licenses_per_day") - .field("@timestamp") - .dateHistogramInterval(DateHistogramInterval.DAY); - licensePerDayBuilder.subAggregation(sumAggBuilder); - groupByLicenseAgg.subAggregation(licensePerDayBuilder); - groupByLicenseAgg.subAggregation(peakPipelineAggBuilder); - - SearchResponse response = client().prepareSearch("foo_*").setSize(0).addAggregation(groupByLicenseAgg).get(); - BytesReference bytes = XContentHelper.toXContent(response, XContentType.JSON, false); - XContentHelper.convertToMap(bytes, false, XContentType.JSON); + @Override + protected double getNestedMetric(InternalBucketMetricValue bucket) { + return bucket.value(); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MinBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MinBucketIT.java index c2d608ad8869e..0dc3e8c87e819 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MinBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/MinBucketIT.java @@ -8,425 +8,51 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; - import java.util.ArrayList; import java.util.List; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.minBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.IsNull.notNullValue; - -@ESIntegTestCase.SuiteScopeTestCase -public class MinBucketIT extends ESIntegTestCase { - - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; +public class MinBucketIT extends BucketMetricsPipeLineAggregationTestCase { @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); - } - - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(minBucket("min_bucket", "histo>_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - - List minKeys = new ArrayList<>(); - double minValue = Double.POSITIVE_INFINITY; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); - if (bucket.getDocCount() < minValue) { - minValue = bucket.getDocCount(); - minKeys = new ArrayList<>(); - minKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == minValue) { - minKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(minValue)); - assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(minBucket("min_bucket", "histo>_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List minKeys = new ArrayList<>(); - double minValue = Double.POSITIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() < minValue) { - minValue = bucket.getDocCount(); - minKeys = new ArrayList<>(); - minKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == minValue) { - minKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(minValue)); - assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); - } + protected MinBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return minBucket(name, bucketsPath); } - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(minBucket("min_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + InternalBucketMetricValue pipelineBucket + ) { List minKeys = new ArrayList<>(); double minValue = Double.POSITIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() < minValue) { - minValue = sum.value(); + for (int i = 0; i < numBuckets; ++i) { + double bucketValue = bucketValues.applyAsDouble(i); + if (bucketValue < minValue) { + minValue = bucketValue; minKeys = new ArrayList<>(); - minKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == minValue) { - minKeys.add(bucket.getKeyAsString()); + minKeys.add(bucketKeys.apply(i)); + } else if (bucketValue == minValue) { + minKeys.add(bucketKeys.apply(i)); } } - - InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(minValue)); - assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); - } - - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(minBucket("min_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List minKeys = new ArrayList<>(); - double minValue = Double.POSITIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() < minValue) { - minValue = sum.value(); - minKeys = new ArrayList<>(); - minKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == minValue) { - minKeys.add(bucket.getKeyAsString()); - } - } - } - - InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(minValue)); - assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); - } - } - - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(minBucket("min_bucket", "histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List minKeys = new ArrayList<>(); - double minValue = Double.POSITIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - if (sum.value() < minValue) { - minValue = sum.value(); - minKeys = new ArrayList<>(); - minKeys.add(bucket.getKeyAsString()); - } else if (sum.value() == minValue) { - minKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(minValue)); - assertThat(minBucketValue.keys(), equalTo(minKeys.toArray(new String[minKeys.size()]))); - } + assertThat(pipelineBucket.value(), equalTo(minValue)); + assertThat(pipelineBucket.keys(), equalTo(minKeys.toArray(new String[0]))); } - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(minBucket("min_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_bucket")); - assertThat(minBucketValue.value(), equalTo(Double.POSITIVE_INFINITY)); - assertThat(minBucketValue.keys(), equalTo(new String[0])); + @Override + protected String nestedMetric() { + return "value"; } - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(minBucket("min_histo_bucket", "histo>_count")) - ) - .addAggregation(minBucket("min_terms_bucket", "terms>min_histo_bucket")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - List minTermsKeys = new ArrayList<>(); - double minTermsValue = Double.POSITIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List minHistoKeys = new ArrayList<>(); - double minHistoValue = Double.POSITIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() < minHistoValue) { - minHistoValue = bucket.getDocCount(); - minHistoKeys = new ArrayList<>(); - minHistoKeys.add(bucket.getKeyAsString()); - } else if (bucket.getDocCount() == minHistoValue) { - minHistoKeys.add(bucket.getKeyAsString()); - } - } - - InternalBucketMetricValue minBucketValue = termsBucket.getAggregations().get("min_histo_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_histo_bucket")); - assertThat(minBucketValue.value(), equalTo(minHistoValue)); - assertThat(minBucketValue.keys(), equalTo(minHistoKeys.toArray(new String[minHistoKeys.size()]))); - if (minHistoValue < minTermsValue) { - minTermsValue = minHistoValue; - minTermsKeys = new ArrayList<>(); - minTermsKeys.add(termsBucket.getKeyAsString()); - } else if (minHistoValue == minTermsValue) { - minTermsKeys.add(termsBucket.getKeyAsString()); - } - } - - InternalBucketMetricValue minBucketValue = response.getAggregations().get("min_terms_bucket"); - assertThat(minBucketValue, notNullValue()); - assertThat(minBucketValue.getName(), equalTo("min_terms_bucket")); - assertThat(minBucketValue.value(), equalTo(minTermsValue)); - assertThat(minBucketValue.keys(), equalTo(minTermsKeys.toArray(new String[minTermsKeys.size()]))); + @Override + protected double getNestedMetric(InternalBucketMetricValue bucket) { + return bucket.value(); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java index 7fa1a16d41fbc..6c432a13815e4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/PercentilesBucketIT.java @@ -9,7 +9,6 @@ package org.elasticsearch.search.aggregations.pipeline; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.search.aggregations.BucketOrder; @@ -18,203 +17,67 @@ import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.metrics.Percentile; import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.test.ESIntegTestCase; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.percentilesBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.core.IsNull.notNullValue; -@ESIntegTestCase.SuiteScopeTestCase -public class PercentilesBucketIT extends ESIntegTestCase { +public class PercentilesBucketIT extends BucketMetricsPipeLineAggregationTestCase { - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; private static final double[] PERCENTS = { 0.0, 1.0, 25.0, 50.0, 75.0, 99.0, 100.0 }; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); + protected PercentilesBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return percentilesBucket(name, bucketsPath).setPercents(PERCENTS); } - public void testDocCountopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(percentilesBucket("percentiles_bucket", "histo>_count").setPercents(PERCENTS)) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - - double[] values = new double[numValueBuckets]; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); - values[i] = bucket.getDocCount(); + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + PercentilesBucket pipelineBucket + ) { + double[] values = new double[numBuckets]; + for (int i = 0; i < numBuckets; ++i) { + values[i] = bucketValues.applyAsDouble(i); } - Arrays.sort(values); - - PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - assertPercentileBucket(PERCENTS, values, percentilesBucketValue); + assertPercentileBucket(PERCENTS, values, pipelineBucket); } - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(percentilesBucket("percentiles_bucket", "histo>_count").setPercents(PERCENTS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double[] values = new double[numValueBuckets]; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - values[j] = bucket.getDocCount(); - } - - Arrays.sort(values); - - PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - assertPercentileBucket(PERCENTS, values, percentilesBucketValue); - } + @Override + protected String nestedMetric() { + return "50"; } - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(percentilesBucket("percentiles_bucket", "terms>sum").setPercents(PERCENTS)) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - - double[] values = new double[interval]; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - values[i] = sum.value(); - } - - Arrays.sort(values); - - PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - assertPercentileBucket(PERCENTS, values, percentilesBucketValue); + @Override + protected double getNestedMetric(PercentilesBucket bucket) { + return bucket.percentile(50); } public void testMetricTopLevelDefaultPercents() throws Exception { SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(percentilesBucket("percentiles_bucket", "terms>sum")) + .addAggregation(terms(termsName).field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket", termsName + ">sum")) .get(); assertSearchResponse(response); - Terms terms = response.getAggregations().get("terms"); + Terms terms = response.getAggregations().get(termsName); assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getName(), equalTo(termsName)); List buckets = terms.getBuckets(); assertThat(buckets.size(), equalTo(interval)); @@ -237,157 +100,21 @@ public void testMetricTopLevelDefaultPercents() throws Exception { assertPercentileBucket(values, percentilesBucketValue); } - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(percentilesBucket("percentiles_bucket", "histo>sum").setPercents(PERCENTS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - List values = new ArrayList<>(numValueBuckets); - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - values.add(sum.value()); - } - } - - Collections.sort(values); - - PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - assertPercentileBucket(PERCENTS, values.stream().mapToDouble(Double::doubleValue).toArray(), percentilesBucketValue); - } - } - - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation( - percentilesBucket("percentiles_bucket", "histo>sum").gapPolicy(BucketHelpers.GapPolicy.INSERT_ZEROS) - .setPercents(PERCENTS) - ) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double[] values = new double[numValueBuckets]; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - - values[j] = sum.value(); - } - - Arrays.sort(values); - - PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - assertPercentileBucket(PERCENTS, values, percentilesBucketValue); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(percentilesBucket("percentiles_bucket", "terms>sum").setPercents(PERCENTS)) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentiles_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentiles_bucket")); - for (Double p : PERCENTS) { - assertThat(percentilesBucketValue.percentile(p), equalTo(Double.NaN)); - } - } - public void testWrongPercents() throws Exception { SearchResponse response = client().prepareSearch("idx") .addAggregation( - terms("terms").field("tag") + terms(termsName).field("tag") .includeExclude(new IncludeExclude(null, "tag.*")) .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) ) - .addAggregation(percentilesBucket("percentiles_bucket", "terms>sum").setPercents(PERCENTS)) + .addAggregation(percentilesBucket("percentiles_bucket", termsName + ">sum").setPercents(PERCENTS)) .get(); assertSearchResponse(response); - Terms terms = response.getAggregations().get("terms"); + Terms terms = response.getAggregations().get(termsName); assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getName(), equalTo(termsName)); List buckets = terms.getBuckets(); assertThat(buckets.size(), equalTo(0)); @@ -408,8 +135,8 @@ public void testBadPercents() throws Exception { try { client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(percentilesBucket("percentiles_bucket", "terms>sum").setPercents(badPercents)) + .addAggregation(terms(termsName).field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) + .addAggregation(percentilesBucket("percentiles_bucket", termsName + ">sum").setPercents(badPercents)) .get(); fail("Illegal percent's were provided but no exception was thrown."); @@ -436,14 +163,14 @@ public void testBadPercents_asSubAgg() throws Exception { try { client().prepareSearch("idx") .addAggregation( - terms("terms").field("tag") + terms(termsName).field("tag") .order(BucketOrder.key(true)) .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) .interval(interval) .extendedBounds(minRandomValue, maxRandomValue) ) - .subAggregation(percentilesBucket("percentiles_bucket", "histo>_count").setPercents(badPercents)) + .subAggregation(percentilesBucket("percentiles_bucket", histoName + ">_count").setPercents(badPercents)) ) .get(); @@ -465,82 +192,27 @@ public void testBadPercents_asSubAgg() throws Exception { } - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(percentilesBucket("percentile_histo_bucket", "histo>_count").setPercents(PERCENTS)) - ) - .addAggregation(percentilesBucket("percentile_terms_bucket", "terms>percentile_histo_bucket.50").setPercents(PERCENTS)) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - double[] values = new double[termsBuckets.size()]; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double[] innerValues = new double[numValueBuckets]; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - - innerValues[j] = bucket.getDocCount(); - } - Arrays.sort(innerValues); - - PercentilesBucket percentilesBucketValue = termsBucket.getAggregations().get("percentile_histo_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentile_histo_bucket")); - assertPercentileBucket(PERCENTS, innerValues, percentilesBucketValue); - values[i] = percentilesBucketValue.percentile(50.0); - } - - Arrays.sort(values); - - PercentilesBucket percentilesBucketValue = response.getAggregations().get("percentile_terms_bucket"); - assertThat(percentilesBucketValue, notNullValue()); - assertThat(percentilesBucketValue.getName(), equalTo("percentile_terms_bucket")); - assertPercentileBucket(PERCENTS, values, percentilesBucketValue); - } - public void testNestedWithDecimal() throws Exception { double[] percent = { 99.9 }; SearchResponse response = client().prepareSearch("idx") .addAggregation( - terms("terms").field("tag") + terms(termsName).field("tag") .order(BucketOrder.key(true)) .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) + histogram(histoName).field(SINGLE_VALUED_FIELD_NAME) + .interval(interval) + .extendedBounds(minRandomValue, maxRandomValue) ) - .subAggregation(percentilesBucket("percentile_histo_bucket", "histo>_count").setPercents(percent)) + .subAggregation(percentilesBucket("percentile_histo_bucket", histoName + ">_count").setPercents(percent)) ) - .addAggregation(percentilesBucket("percentile_terms_bucket", "terms>percentile_histo_bucket[99.9]").setPercents(percent)) + .addAggregation(percentilesBucket("percentile_terms_bucket", termsName + ">percentile_histo_bucket[99.9]").setPercents(percent)) .get(); assertSearchResponse(response); - Terms terms = response.getAggregations().get("terms"); + Terms terms = response.getAggregations().get(termsName); assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); + assertThat(terms.getName(), equalTo(termsName)); List termsBuckets = terms.getBuckets(); assertThat(termsBuckets.size(), equalTo(interval)); @@ -550,9 +222,9 @@ public void testNestedWithDecimal() throws Exception { assertThat(termsBucket, notNullValue()); assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - Histogram histo = termsBucket.getAggregations().get("histo"); + Histogram histo = termsBucket.getAggregations().get(histoName); assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); + assertThat(histo.getName(), equalTo(histoName)); List buckets = histo.getBuckets(); double[] innerValues = new double[numValueBuckets]; @@ -586,8 +258,12 @@ public void testNestedWithDecimal() throws Exception { private void assertPercentileBucket(double[] values, PercentilesBucket percentiles) { for (Percentile percentile : percentiles) { assertEquals(percentiles.percentile(percentile.getPercent()), percentile.getValue(), 0d); - int index = (int) Math.round((percentile.getPercent() / 100.0) * (values.length - 1)); - assertThat(percentile.getValue(), equalTo(values[index])); + if (values.length == 0) { + assertThat(percentile.getValue(), equalTo(Double.NaN)); + } else { + int index = (int) Math.round((percentile.getPercent() / 100.0) * (values.length - 1)); + assertThat(percentile.getValue(), equalTo(values[index])); + } } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java index 478ac96544de2..7040f3bf115f3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/StatsBucketIT.java @@ -8,428 +8,51 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.statsBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.IsNull.notNullValue; - -@ESIntegTestCase.SuiteScopeTestCase -public class StatsBucketIT extends ESIntegTestCase { - - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; +public class StatsBucketIT extends BucketMetricsPipeLineAggregationTestCase { @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); + protected StatsBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return statsBucket(name, bucketsPath); } - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(statsBucket("stats_bucket", "histo>_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + StatsBucket pipelineBucket + ) { double sum = 0; int count = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); + for (int i = 0; i < numBuckets; ++i) { + double bucketValue = bucketValues.applyAsDouble(i); count++; - sum += bucket.getDocCount(); - min = Math.min(min, bucket.getDocCount()); - max = Math.max(max, bucket.getDocCount()); + sum += bucketValue; + min = Math.min(min, bucketValue); + max = Math.max(max, bucketValue); } double avgValue = count == 0 ? Double.NaN : (sum / count); - StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(statsBucket("stats_bucket", "histo>_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double sum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - count++; - sum += bucket.getDocCount(); - min = Math.min(min, bucket.getDocCount()); - max = Math.max(max, bucket.getDocCount()); - } - - double avgValue = count == 0 ? Double.NaN : (sum / count); - StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); - } - } - - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(statsBucket("stats_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); + assertThat(pipelineBucket.getAvg(), equalTo(avgValue)); + assertThat(pipelineBucket.getMin(), equalTo(min)); + assertThat(pipelineBucket.getMax(), equalTo(max)); } - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(statsBucket("stats_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - } - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); - } - } - - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(statsBucket("stats_bucket", "histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - int count = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - - count++; - bucketSum += sum.value(); - min = Math.min(min, sum.value()); - max = Math.max(max, sum.value()); - } - - double avgValue = count == 0 ? Double.NaN : (bucketSum / count); - StatsBucket statsBucketValue = termsBucket.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(statsBucket("stats_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - StatsBucket statsBucketValue = response.getAggregations().get("stats_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("stats_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(Double.NaN)); + @Override + protected String nestedMetric() { + return "avg"; } - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(statsBucket("avg_histo_bucket", "histo>_count")) - ) - .addAggregation(statsBucket("avg_terms_bucket", "terms>avg_histo_bucket.avg")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - double aggTermsSum = 0; - int aggTermsCount = 0; - double min = Double.POSITIVE_INFINITY; - double max = Double.NEGATIVE_INFINITY; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double aggHistoSum = 0; - int aggHistoCount = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - - aggHistoCount++; - aggHistoSum += bucket.getDocCount(); - } - - double avgHistoValue = aggHistoCount == 0 ? Double.NaN : (aggHistoSum / aggHistoCount); - StatsBucket statsBucketValue = termsBucket.getAggregations().get("avg_histo_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("avg_histo_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgHistoValue)); - - aggTermsCount++; - aggTermsSum += avgHistoValue; - min = Math.min(min, avgHistoValue); - max = Math.max(max, avgHistoValue); - } - - double avgTermsValue = aggTermsCount == 0 ? Double.NaN : (aggTermsSum / aggTermsCount); - StatsBucket statsBucketValue = response.getAggregations().get("avg_terms_bucket"); - assertThat(statsBucketValue, notNullValue()); - assertThat(statsBucketValue.getName(), equalTo("avg_terms_bucket")); - assertThat(statsBucketValue.getAvg(), equalTo(avgTermsValue)); - assertThat(statsBucketValue.getMin(), equalTo(min)); - assertThat(statsBucketValue.getMax(), equalTo(max)); + @Override + protected double getNestedMetric(StatsBucket bucket) { + return bucket.getAvg(); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SumBucketIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SumBucketIT.java index d1e93f22919b4..285322ef12355 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SumBucketIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/pipeline/SumBucketIT.java @@ -8,371 +8,40 @@ package org.elasticsearch.search.aggregations.pipeline; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.search.aggregations.BucketOrder; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; -import org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.IncludeExclude; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.metrics.Sum; -import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; -import org.elasticsearch.test.ESIntegTestCase; +import java.util.function.Function; +import java.util.function.IntToDoubleFunction; -import java.util.ArrayList; -import java.util.List; - -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; -import static org.elasticsearch.search.aggregations.AggregationBuilders.histogram; -import static org.elasticsearch.search.aggregations.AggregationBuilders.sum; -import static org.elasticsearch.search.aggregations.AggregationBuilders.terms; import static org.elasticsearch.search.aggregations.PipelineAggregatorBuilders.sumBucket; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.core.IsNull.notNullValue; - -@ESIntegTestCase.SuiteScopeTestCase -public class SumBucketIT extends ESIntegTestCase { - - private static final String SINGLE_VALUED_FIELD_NAME = "l_value"; - static int numDocs; - static int interval; - static int minRandomValue; - static int maxRandomValue; - static int numValueBuckets; - static long[] valueCounts; +public class SumBucketIT extends BucketMetricsPipeLineAggregationTestCase { @Override - public void setupSuiteScopeCluster() throws Exception { - assertAcked(client().admin().indices().prepareCreate("idx").addMapping("type", "tag", "type=keyword").get()); - createIndex("idx_unmapped"); - - numDocs = randomIntBetween(6, 20); - interval = randomIntBetween(2, 5); - - minRandomValue = 0; - maxRandomValue = 20; - - numValueBuckets = ((maxRandomValue - minRandomValue) / interval) + 1; - valueCounts = new long[numValueBuckets]; - - List builders = new ArrayList<>(); - - for (int i = 0; i < numDocs; i++) { - int fieldValue = randomIntBetween(minRandomValue, maxRandomValue); - builders.add( - client().prepareIndex("idx", "type") - .setSource( - jsonBuilder().startObject() - .field(SINGLE_VALUED_FIELD_NAME, fieldValue) - .field("tag", "tag" + (i % interval)) - .endObject() - ) - ); - final int bucket = (fieldValue / interval); // + (fieldValue < 0 ? -1 : 0) - (minRandomValue / interval - 1); - valueCounts[bucket]++; - } - - assertAcked(prepareCreate("empty_bucket_idx").addMapping("type", SINGLE_VALUED_FIELD_NAME, "type=integer")); - for (int i = 0; i < 2; i++) { - builders.add( - client().prepareIndex("empty_bucket_idx", "type", "" + i) - .setSource(jsonBuilder().startObject().field(SINGLE_VALUED_FIELD_NAME, i * 2).endObject()) - ); - } - indexRandom(true, builders); - ensureSearchable(); + protected SumBucketPipelineAggregationBuilder BucketMetricsPipelineAgg(String name, String bucketsPath) { + return sumBucket(name, bucketsPath); } - public void testDocCountTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .addAggregation(sumBucket("sum_bucket", "histo>_count")) - .get(); - - assertSearchResponse(response); - - Histogram histo = response.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - assertThat(buckets.size(), equalTo(numValueBuckets)); - + @Override + protected void assertResult( + IntToDoubleFunction bucketValues, + Function bucketKeys, + int numBuckets, + InternalSimpleValue pipelineBucket + ) { double sum = 0; - for (int i = 0; i < numValueBuckets; ++i) { - Histogram.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) i * interval)); - assertThat(bucket.getDocCount(), equalTo(valueCounts[i])); - sum += bucket.getDocCount(); - } - - InternalSimpleValue sumBucketValue = response.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(sum)); - } - - public void testDocCountAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(sumBucket("sum_bucket", "histo>_count")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double sum = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - sum += bucket.getDocCount(); - } - - InternalSimpleValue sumBucketValue = termsBucket.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(sum)); - } - } - - public void testMetricTopLevel() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation(terms("terms").field("tag").subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME))) - .addAggregation(sumBucket("sum_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(interval)); - - double bucketSum = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket bucket = buckets.get(i); - assertThat(bucket, notNullValue()); - assertThat((String) bucket.getKey(), equalTo("tag" + (i % interval))); - assertThat(bucket.getDocCount(), greaterThan(0L)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - bucketSum += sum.value(); - } - - InternalSimpleValue sumBucketValue = response.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(bucketSum)); - } - - public void testMetricAsSubAgg() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(sumBucket("sum_bucket", "histo>sum")) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - if (bucket.getDocCount() != 0) { - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - bucketSum += sum.value(); - } - } - - InternalSimpleValue sumBucketValue = termsBucket.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(bucketSum)); + for (int i = 0; i < numBuckets; ++i) { + sum += bucketValues.applyAsDouble(i); } + assertThat(pipelineBucket.value(), equalTo(sum)); } - public void testMetricAsSubAggWithInsertZeros() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME) - .interval(interval) - .extendedBounds(minRandomValue, maxRandomValue) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .subAggregation(sumBucket("sum_bucket", "histo>sum").gapPolicy(GapPolicy.INSERT_ZEROS)) - ) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double bucketSum = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - Sum sum = bucket.getAggregations().get("sum"); - assertThat(sum, notNullValue()); - - bucketSum += sum.value(); - } - - InternalSimpleValue sumBucketValue = termsBucket.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(bucketSum)); - } - } - - public void testNoBuckets() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .includeExclude(new IncludeExclude(null, "tag.*")) - .subAggregation(sum("sum").field(SINGLE_VALUED_FIELD_NAME)) - ) - .addAggregation(sumBucket("sum_bucket", "terms>sum")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List buckets = terms.getBuckets(); - assertThat(buckets.size(), equalTo(0)); - - InternalSimpleValue sumBucketValue = response.getAggregations().get("sum_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_bucket")); - assertThat(sumBucketValue.value(), equalTo(0.0)); + @Override + protected String nestedMetric() { + return "value"; } - public void testNested() throws Exception { - SearchResponse response = client().prepareSearch("idx") - .addAggregation( - terms("terms").field("tag") - .order(BucketOrder.key(true)) - .subAggregation( - histogram("histo").field(SINGLE_VALUED_FIELD_NAME).interval(interval).extendedBounds(minRandomValue, maxRandomValue) - ) - .subAggregation(sumBucket("sum_histo_bucket", "histo>_count")) - ) - .addAggregation(sumBucket("sum_terms_bucket", "terms>sum_histo_bucket")) - .get(); - - assertSearchResponse(response); - - Terms terms = response.getAggregations().get("terms"); - assertThat(terms, notNullValue()); - assertThat(terms.getName(), equalTo("terms")); - List termsBuckets = terms.getBuckets(); - assertThat(termsBuckets.size(), equalTo(interval)); - - double aggTermsSum = 0; - for (int i = 0; i < interval; ++i) { - Terms.Bucket termsBucket = termsBuckets.get(i); - assertThat(termsBucket, notNullValue()); - assertThat((String) termsBucket.getKey(), equalTo("tag" + (i % interval))); - - Histogram histo = termsBucket.getAggregations().get("histo"); - assertThat(histo, notNullValue()); - assertThat(histo.getName(), equalTo("histo")); - List buckets = histo.getBuckets(); - - double aggHistoSum = 0; - for (int j = 0; j < numValueBuckets; ++j) { - Histogram.Bucket bucket = buckets.get(j); - assertThat(bucket, notNullValue()); - assertThat(((Number) bucket.getKey()).longValue(), equalTo((long) j * interval)); - - aggHistoSum += bucket.getDocCount(); - } - - InternalSimpleValue sumBucketValue = termsBucket.getAggregations().get("sum_histo_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_histo_bucket")); - assertThat(sumBucketValue.value(), equalTo(aggHistoSum)); - - aggTermsSum += aggHistoSum; - } - - InternalSimpleValue sumBucketValue = response.getAggregations().get("sum_terms_bucket"); - assertThat(sumBucketValue, notNullValue()); - assertThat(sumBucketValue.getName(), equalTo("sum_terms_bucket")); - assertThat(sumBucketValue.value(), equalTo(aggTermsSum)); + @Override + protected double getNestedMetric(InternalSimpleValue bucket) { + return bucket.value(); } }