diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index fe8a49d503968..b79175acc5010 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -288,3 +288,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[] include::pipeline/bucket-sort-aggregation.asciidoc[] include::pipeline/serial-diff-aggregation.asciidoc[] include::pipeline/moving-percentiles-aggregation.asciidoc[] +include::pipeline/normalize-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc new file mode 100644 index 0000000000000..adeeadaa1b6f9 --- /dev/null +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -0,0 +1,182 @@ +[role="xpack"] +[testenv="basic"] +[[search-aggregations-pipeline-normalize-aggregation]] +=== Normalize Aggregation + +A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value. +Values that cannot be normalized, will be skipped using the <>. + +==== Syntax + +A `normalize` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "normalize": { + "buckets_path": "normalized", + "method": "percent_of_sum" + } +} +-------------------------------------------------- +// NOTCONSOLE + +[[normalize_pipeline-params]] +.`normalize_pipeline` Parameters +[options="header"] +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to normalize (see <> for more details) |Required | +|`method` | The specific <> to apply | Required | +|`format` |format to apply to the output value of this aggregation |Optional |`null` +|=== + +==== Methods +[[normalize_pipeline-method]] + +The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use +the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`. + +_rescale_0_1_:: + This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized + linearly in-between. + + x' = (x - min_x) / (max_x - min_x) + + [0, 0, .1111, 1, .1111, .3333] + +_rescale_0_100_:: + This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized + linearly in-between. + + x' = 100 * (x - min_x) / (max_x - min_x) + + [0, 0, 11.11, 100, 11.11, 33.33] + +_percent_of_sum_:: + This method normalizes each value so that it represents a percentage of the total sum it attributes to. + + x' = x / sum_x + + [5%, 5%, 10%, 50%, 10%, 20%] + + +_mean_:: + This method normalizes such that each value is normalized by how much it differs from the average. + + x' = (x - mean_x) / (max_x - min_x) + + [4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63] + +_zscore_:: + This method normalizes such that each value represents how far it is from the mean relative to the standard deviation + + x' = (x - mean_x) / stdev_x + + [-0.68, -0.68, -0.39, 1.94, -0.39, 0.19] + +_softmax_:: + This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values. + + x' = e^x / sum_e_x + + [2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] + + +==== Example + +The following snippet calculates the percent of total sales for each month: + +[source,console] +-------------------------------------------------- +POST /sales/_search +{ + "size": 0, + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "calendar_interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + }, + "percent_of_total_sales": { + "normalize": { + "buckets_path": "sales", <1> + "method": "percent_of_sum", <2> + "format": "00.00%" <3> + } + } + } + } + } +} +-------------------------------------------------- +// TEST[setup:sales] + +<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling +<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales + in the parent bucket +<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100 + and adding a '%' + +And the following may be the response: + +[source,console-result] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550.0 + }, + "percent_of_total_sales": { + "value": 0.5583756345177665, + "value_as_string": "55.84%" + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60.0 + }, + "percent_of_total_sales": { + "value": 0.06091370558375635, + "value_as_string": "06.09%" + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375.0 + }, + "percent_of_total_sales": { + "value": 0.38071065989847713, + "value_as_string": "38.07%" + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java index 71bf9450f03e5..2edfea344476f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl public static final String NAME = "simple_value"; protected final double value; - InternalSimpleValue(String name, double value, DocValueFormat formatter, Map metadata) { + public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map metadata) { super(name, metadata); this.format = formatter; this.value = value; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index df458e5abf21e..144ae54347d41 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -32,6 +32,7 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction; import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory; +import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; @@ -84,6 +85,11 @@ public List getPipelineAggregations() { MovingPercentilesPipelineAggregationBuilder::new, usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES, checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER)))); + pipelineAggs.add(new PipelineAggregationSpec( + NormalizePipelineAggregationBuilder.NAME, + NormalizePipelineAggregationBuilder::new, + usage.track(AnalyticsStatsAction.Item.NORMALIZE, + checkLicense(NormalizePipelineAggregationBuilder.PARSER)))); return pipelineAggs; } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java new file mode 100644 index 0000000000000..213015b287404 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.DoubleUnaryOperator; +import java.util.function.Function; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore; + +public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { + public static final String NAME = "normalize"; + static final ParseField METHOD_FIELD = new ParseField("method"); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0], + (String) args[1], (List) args[2])); + + static { + PARSER.declareString(optionalConstructorArg(), FORMAT); + PARSER.declareString(constructorArg(), METHOD_FIELD); + PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); + } + + static final Map> NAME_MAP; + + static { + NAME_MAP = new HashMap<>(); + NAME_MAP.put(RescaleZeroToOne.NAME, RescaleZeroToOne::new); + NAME_MAP.put(RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new); + NAME_MAP.put(Mean.NAME, Mean::new); + NAME_MAP.put(ZScore.NAME, ZScore::new); + NAME_MAP.put(Percent.NAME, Percent::new); + NAME_MAP.put(Softmax.NAME, Softmax::new); + } + + static String validateMethodName(String name) { + if (NAME_MAP.containsKey(name)) { + return name; + } + throw new IllegalArgumentException("invalid method [" + name + "]"); + } + + private final String format; + private final String method; + + public NormalizePipelineAggregationBuilder(String name, String format, String method, List bucketsPath) { + super(name, NAME, bucketsPath.toArray(new String[0])); + this.format = format; + this.method = validateMethodName(method); + } + + /** + * Read from a stream. + */ + public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException { + super(in, NAME); + format = in.readOptionalString(); + method = in.readString(); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + out.writeString(method); + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + protected DocValueFormat formatter() { + if (format != null) { + return new DocValueFormat.Decimal(format); + } else { + return DocValueFormat.RAW; + } + } + + @Override + protected PipelineAggregator createInternal(Map metadata) { + return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata); + } + + @Override + protected void validate(ValidationContext context) { + if (bucketsPaths.length != 1) { + context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); + } + context.validateHasParent(NAME, name); + } + + @Override + protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format); + } + builder.field(METHOD_FIELD.getPreferredName(), method); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), format, method); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj; + return Objects.equals(format, other.format) && Objects.equals(method, other.method); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java new file mode 100644 index 0000000000000..cf18c49e7c81b --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -0,0 +1,76 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.DoubleUnaryOperator; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +public class NormalizePipelineAggregator extends PipelineAggregator { + private final DocValueFormat formatter; + private final Function methodSupplier; + + NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, + Function methodSupplier, + Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + this.methodSupplier = methodSupplier; + } + + @Override + @SuppressWarnings("unchecked") + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation originalAgg = + (InternalMultiBucketAggregation) aggregation; + List buckets = originalAgg.getBuckets(); + List newBuckets = new ArrayList<>(buckets.size()); + + double[] values = buckets.stream() + .mapToDouble(bucket -> resolveBucketValue(originalAgg, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray(); + + DoubleUnaryOperator method = methodSupplier.apply(values); + + for (int i = 0; i < buckets.size(); i++) { + InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i); + + final double normalizedBucketValue; + + // Only account for valid values. infite-valued buckets were converted to NaNs by + // the time they reach here. + if (Double.isNaN(values[i])) { + normalizedBucketValue = Double.NaN; + } else { + normalizedBucketValue = method.applyAsDouble(values[i]); + } + + List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata())); + InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs), bucket); + newBuckets.add(newBucket); + } + + return originalAgg.create(newBuckets); + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java new file mode 100644 index 0000000000000..62e79d92fa4a5 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + + +import java.util.function.DoubleUnaryOperator; + +class NormalizePipelineMethods { + + // never to be instantiated + private NormalizePipelineMethods() {} + + static class RescaleZeroToOne extends SinglePassSimpleStatisticsMethod { + static final String NAME = "rescale_0_1"; + + RescaleZeroToOne(double[] values) { + super(values); + } + + @Override + public double applyAsDouble(double value) { + return (value - min) / (max - min); + } + } + + static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsMethod { + static final String NAME = "rescale_0_100"; + + RescaleZeroToOneHundred(double[] values) { + super(values); + } + + @Override + public double applyAsDouble(double value) { + return 100 * (value - min) / (max - min); + } + } + + static class Mean extends SinglePassSimpleStatisticsMethod { + static final String NAME = "mean"; + + Mean(double[] values) { + super(values); + } + + @Override + public double applyAsDouble(double value) { + return (value - mean) / (max - min); + } + } + + static class Percent extends SinglePassSimpleStatisticsMethod { + static final String NAME = "percent_of_sum"; + + Percent(double[] values) { + super(values); + } + + @Override + public double applyAsDouble(double value) { + return value / sum; + } + } + + static class ZScore extends SinglePassSimpleStatisticsMethod { + static final String NAME = "z-score"; + + private final double stdev; + + ZScore(double[] values) { + super(values); + double variance = 0.0; + for (Double value : values) { + if (value.isNaN() == false) { + variance += Math.pow(value - mean, 2); + } + } + this.stdev = Math.sqrt(variance / count); + } + + @Override + public double applyAsDouble(double value) { + return (value - mean) / stdev; + } + } + + static class Softmax implements DoubleUnaryOperator { + static final String NAME = "softmax"; + + private double sumExp; + + Softmax(double[] values) { + double sumExp = 0.0; + for (Double value : values) { + if (value.isNaN() == false) { + sumExp += Math.exp(value); + } + } + + this.sumExp = sumExp; + } + + @Override + public double applyAsDouble(double value) { + return Math.exp(value) / sumExp; + } + } + + abstract static class SinglePassSimpleStatisticsMethod implements DoubleUnaryOperator { + protected final double max; + protected final double min; + protected final double sum; + protected final double mean; + protected final int count; + + SinglePassSimpleStatisticsMethod(double[] values) { + int count = 0; + double sum = 0.0; + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + + for (double value : values) { + if (Double.isNaN(value) == false) { + count += 1; + min = Math.min(value, min); + max = Math.max(value, max); + sum += value; + } + } + + this.count = count; + this.min = min; + this.max = max; + this.sum = sum; + this.mean = this.count == 0 ? Double.NaN : this.sum / this.count; + } + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java index 16c0b9b1cc800..7f6408ade5bbb 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java @@ -44,6 +44,7 @@ public void testItemEnum() { assertThat(AnalyticsStatsAction.Item.TOP_METRICS.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.MOVING_PERCENTILES.ordinal(), equalTo(i++)); + assertThat(AnalyticsStatsAction.Item.NORMALIZE.ordinal(), equalTo(i++)); // Please add tests for newly added items here assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i)); } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java new file mode 100644 index 0000000000000..155df380700ce --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.KeywordFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; +import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; + +public class NormalizeAggregatorTests extends AggregatorTestCase { + + private static final String DATE_FIELD = "date"; + private static final String TERM_FIELD = "term"; + private static final String VALUE_FIELD = "value_field"; + + private static final List datasetTimes = Arrays.asList( + "2017-01-01T01:07:45", //1 + "2017-01-01T03:43:34", //1 + "2017-01-03T04:11:00", //3 + "2017-01-03T05:11:31", //3 + "2017-01-05T08:24:05", //5 + "2017-01-05T13:09:32", //5 + "2017-01-07T13:47:43", //7 + "2017-01-08T16:14:34", //8 + "2017-01-09T17:09:50", //9 + "2017-01-09T22:55:46");//9 + private static final List datasetTerms = Arrays.asList( + "a", //1 + "a", //1 + "b", //2 + "b", //2 + "c", //3 + "c", //3 + "d", //4 + "e", //5 + "f", //6 + "f");//6 + + private static final List datasetValues = Arrays.asList(1,1,42,6,5,0,2,8,30,13); + private static final List datePercentOfSum = Arrays.asList(0.2,0.0,0.2,0.0,0.2,0.0,0.1,0.1,0.2); + private static final List termPercentOfSum = Arrays.asList(0.2,0.2,0.2,0.2,0.1,0.1); + private static final List rescaleOneHundred = Arrays.asList(0.0,Double.NaN,100.0,Double.NaN,6.521739130434782, + Double.NaN,0.0,13.043478260869565,89.1304347826087); + + public void testPercentOfTotalDocCount() throws IOException { + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum", + Collections.singletonList("_count"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(9, ((Histogram) agg).getBuckets().size()); + List buckets = ((Histogram) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(datePercentOfSum.get(i))); + } + }); + } + + public void testValueMean() throws IOException { + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); + aggBuilder.subAggregation(new StatsAggregationBuilder("stats").field(VALUE_FIELD)); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "rescale_0_100", + Collections.singletonList("stats.sum"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(9, ((Histogram) agg).getBuckets().size()); + List buckets = ((Histogram) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(rescaleOneHundred.get(i))); + } + }); + } + + public void testTermsAggParent() throws IOException { + TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field(TERM_FIELD); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum", + Collections.singletonList("_count"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(6, ((Terms) agg).getBuckets().size()); + List buckets = ((Terms) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Terms.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(termPercentOfSum.get(i))); + } + }); + + } + + private void testCase(ValuesSourceAggregationBuilder aggBuilder, Consumer aggAssertion) throws IOException { + Query query = new MatchAllDocsQuery(); + // index date data + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + for (int i = 0; i < datasetValues.size(); i++) { + if (frequently()) { + indexWriter.commit(); + } + long instant = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(datasetTimes.get(i))) + .toInstant().toEpochMilli(); + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(i))); + document.add(new SortedSetDocValuesField(TERM_FIELD, new BytesRef(datasetTerms.get(i)))); + indexWriter.addDocument(document); + document.clear(); + } + } + + // setup mapping + DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.Builder("_name").fieldType(); + dateFieldType.setHasDocValues(true); + dateFieldType.setName(DATE_FIELD); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setHasDocValues(true); + valueFieldType.setName(VALUE_FIELD); + MappedFieldType termFieldType = new KeywordFieldMapper.KeywordFieldType(); + termFieldType.setName(TERM_FIELD); + termFieldType.setHasDocValues(true); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + InternalAggregation internalAggregation = searchAndReduce(indexSearcher, query, aggBuilder, dateFieldType, + valueFieldType, termFieldType); + aggAssertion.accept(internalAggregation); + } + } + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethodsTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethodsTests.java new file mode 100644 index 0000000000000..cc5b19d4e5b0b --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethodsTests.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.test.ESTestCase; + +import java.util.function.DoubleUnaryOperator; + +import static org.hamcrest.Matchers.equalTo; + +public class NormalizePipelineMethodsTests extends ESTestCase { + + private static final double[] DATA = + new double[] { 1, 50, Double.NaN, 8, 10, 4, 3, 0, 10, -10, -4}; + private static final int COUNT = 10; + private static final double MIN = -10; + private static final double MAX = 50; + private static final double SUM = 72; + private static final double MEAN = SUM / COUNT; + + public void testRescaleZeroToOne() { + NormalizePipelineMethods.RescaleZeroToOne normalizer = new NormalizePipelineMethods.RescaleZeroToOne(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { 0.18333333333333332, 1.0, Double.NaN, 0.3, 0.3333333333333333, 0.23333333333333334, + 0.21666666666666667, 0.16666666666666666, 0.3333333333333333, 0.0, 0.1 }; + assertNormalized(normalizer, normalized); + } + + public void testRescaleZeroToOneHundred() { + NormalizePipelineMethods.RescaleZeroToOneHundred normalizer = new NormalizePipelineMethods.RescaleZeroToOneHundred(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { 18.333333333333332, 100.0, Double.NaN, 30.0, 33.333333333333336, 23.333333333333332, + 21.666666666666668, 16.666666666666668, 33.333333333333336, 0.0, 10.0 }; + assertNormalized(normalizer, normalized); + } + + public void testMean() { + NormalizePipelineMethods.Mean normalizer = new NormalizePipelineMethods.Mean(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { -0.10333333333333333, 0.7133333333333333, Double.NaN, 0.01333333333333333, + 0.04666666666666666, -0.05333333333333334, -0.07, -0.12000000000000001, 0.04666666666666666, + -0.2866666666666667, -0.18666666666666665 }; + assertNormalized(normalizer, normalized); + } + + public void testZScore() { + NormalizePipelineMethods.ZScore normalizer = new NormalizePipelineMethods.ZScore(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { -0.4012461740749068, 2.7698929436138724, Double.NaN, 0.05177369988063312, + 0.18120794958221595, -0.20709479952253254, -0.27181192437332397, -0.4659632989256982, 0.18120794958221595, + -1.1131345474336123, -0.7248317983288638 }; + assertNormalized(normalizer, normalized); + } + + public void testSoftmax() { + NormalizePipelineMethods.Softmax normalizer = new NormalizePipelineMethods.Softmax(DATA); + double[] normalized = new double[] { 5.242885663363464E-22, 1.0, Double.NaN, 5.74952226429356E-19, 4.24835425529159E-18, + 1.0530617357553813E-20, 3.8739976286871875E-21, 1.928749847963918E-22, 4.24835425529159E-18, 8.756510762696521E-27, + 3.532628572200807E-24 }; + + assertNormalized(normalizer, normalized); + } + + private void assertSinglePassStatistics(NormalizePipelineMethods.SinglePassSimpleStatisticsMethod normalizer) { + assertThat(normalizer.min, equalTo(MIN)); + assertThat(normalizer.max, equalTo(MAX)); + assertThat(normalizer.count, equalTo(COUNT)); + assertThat(normalizer.sum, equalTo(SUM)); + assertThat(normalizer.mean, equalTo(MEAN)); + } + + private void assertNormalized(DoubleUnaryOperator op, double[] normalizedData) { + assertThat(normalizedData.length, equalTo(DATA.length)); + for (int i = 0; i < DATA.length; i++) { + if (Double.isNaN(DATA[i])) { + assertTrue(Double.isNaN(normalizedData[i])); + } else { + assertThat(op.applyAsDouble(DATA[i]), equalTo(normalizedData[i])); + } + } + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java new file mode 100644 index 0000000000000..e474e8b996001 --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java @@ -0,0 +1,62 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; +import org.hamcrest.CoreMatchers; + +import java.util.Collections; +import java.util.List; + +import static java.util.Collections.emptyList; +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; + +public class NormalizeTests extends BasePipelineAggregationTestCase { + + @Override + protected List plugins() { + return singletonList(new SearchPlugin() { + @Override + public List getPipelineAggregations() { + return singletonList(new PipelineAggregationSpec( + NormalizePipelineAggregationBuilder.NAME, + NormalizePipelineAggregationBuilder::new, + NormalizePipelineAggregationBuilder.PARSER)); + } + }); + } + + public void testInvalidNormalizer() { + NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory(); + String invalidNormalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()) + randomAlphaOfLength(10); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> new NormalizePipelineAggregationBuilder(builder.getName(), builder.format(), invalidNormalizer, + org.elasticsearch.common.collect.List.of(builder.getBucketsPaths()))); + assertThat(exception.getMessage(), equalTo("invalid method [" + invalidNormalizer + "]")); + } + + public void testHasParentValidation() { + NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory(); + assertThat(validate(emptyList(), builder), CoreMatchers.equalTo( + "Validation Failed: 1: normalize aggregation [" + builder.getName() + "] must be declared inside" + + " of another aggregation;")); + } + + @Override + protected NormalizePipelineAggregationBuilder createTestAggregatorFactory() { + String name = randomAlphaOfLengthBetween(3, 20); + String bucketsPath = randomAlphaOfLengthBetween(3, 20); + String format = null; + if (randomBoolean()) { + format = randomAlphaOfLengthBetween(1, 10); + } + String normalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()); + return new NormalizePipelineAggregationBuilder(name, format, normalizer, Collections.singletonList(bucketsPath)); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java index b062dc2730f33..4ca4b3bbde6ff 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java @@ -44,7 +44,8 @@ public enum Item { STRING_STATS, TOP_METRICS, T_TEST, - MOVING_PERCENTILES; + MOVING_PERCENTILES, + NORMALIZE; } public static class Request extends BaseNodesRequest implements ToXContentObject { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml new file mode 100644 index 0000000000000..6c7766a75cfae --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml @@ -0,0 +1,79 @@ +setup: + - skip: + features: headers + - do: + indices.create: + index: foo + body: + mappings: + properties: + timestamp: + type: date + user: + type: keyword + + + - do: + bulk: + refresh: true + body: + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "c" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-03T05:00:00Z" + user: "d" + +--- +"Basic Search": + + - do: + search: + index: "foo" + body: + size: 0 + aggs: + users_by_day: + date_histogram: + field: "timestamp" + calendar_interval: "day" + aggs: + percent_of_total_users: + normalize: + buckets_path: "_count" + method: "percent_of_sum" + + - length: { aggregations.users_by_day.buckets: 3 } + - match: { aggregations.users_by_day.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.0.doc_count: 3 } + - match: { aggregations.users_by_day.buckets.0.percent_of_total_users.value: 0.5 } + - match: { aggregations.users_by_day.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.1.doc_count: 2 } + - match: { aggregations.users_by_day.buckets.1.percent_of_total_users.value: 0.3333333333333333 } + - match: { aggregations.users_by_day.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.2.doc_count: 1 } + - match: { aggregations.users_by_day.buckets.2.percent_of_total_users.value: 0.16666666666666666 } + diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml index 61f2763a4c7f6..0118f8545b4cd 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml @@ -27,6 +27,7 @@ setup: - set: {analytics.stats.t_test_usage: t_test_usage} - set: {analytics.stats.string_stats_usage: string_stats_usage} - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} + - set: { analytics.stats.normalize_usage: normalize_usage } # use boxplot agg - do: @@ -52,6 +53,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use top_metrics agg - do: @@ -80,6 +82,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use cumulative_cardinality agg - do: @@ -112,6 +115,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use t-test agg - do: @@ -138,6 +142,7 @@ setup: - set: {analytics.stats.t_test_usage: t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } - do: search: @@ -160,6 +165,7 @@ setup: - gt: { analytics.stats.string_stats_usage: $string_stats_usage } - set: {analytics.stats.string_stats_usage: string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use moving_percentile agg - do: @@ -193,3 +199,38 @@ setup: - match: {analytics.stats.string_stats_usage: $string_stats_usage} - gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage } - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } + + # use normalize agg + - do: + search: + index: "test" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + calendar_interval: "day" + aggs: + total_users: + sum: + field: "s" + percent_of_total_users: + normalize: + buckets_path: "total_users" + method: "percent_of_sum" + + - length: { aggregations.histo.buckets: 1 } + + - do: {xpack.usage: {}} + - match: { analytics.available: true } + - match: { analytics.enabled: true } + - match: {analytics.stats.boxplot_usage: $boxplot_usage} + - match: {analytics.stats.top_metrics_usage: $top_metrics_usage} + - match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage} + - match: {analytics.stats.t_test_usage: $t_test_usage} + - match: {analytics.stats.string_stats_usage: $string_stats_usage} + - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - gt: { analytics.stats.normalize_usage: $normalize_usage } + - set: {analytics.stats.normalize_usage: normalize_usage}