From 5427899ada2304388686d28ce8f3c1cd435dee88 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 29 Apr 2020 22:11:33 -0700 Subject: [PATCH 01/11] Add Normalize Pipeline Aggregation This aggregation will perform normalizations of metrics for a given series of data in the form of bucket values. The aggregations supports the following normalizations - rescale 0-1 - rescale 0-100 - percentage of sum - mean normalization - z-score normalization - softmax normalization To specify which normalization is to be used, it can be specified in the normalize agg's `normalizer` field. For example: ``` { "normalize": { "buckets_path": <>, "normalizer": "percent" } } ``` Closes #51005. --- docs/reference/aggregations/pipeline.asciidoc | 1 + .../pipeline/normalize-aggregation.asciidoc | 119 ++++++++++++++ docs/reference/rest-api/usage.asciidoc | 1 + .../pipeline/InternalSimpleValue.java | 2 +- .../xpack/analytics/AnalyticsPlugin.java | 12 +- .../NormalizePipelineAggregationBuilder.java | 153 ++++++++++++++++++ .../NormalizePipelineAggregator.java | 78 +++++++++ .../NormalizePipelineNormalizer.java | 140 ++++++++++++++++ ...AnalyticsStatsActionNodeResponseTests.java | 1 + .../normalize/NormalizeAggregatorTests.java | 13 ++ .../analytics/normalize/NormalizeTests.java | 52 ++++++ .../action/AnalyticsStatsAction.java | 3 +- .../test/analytics/normalize.yml | 86 ++++++++++ .../rest-api-spec/test/analytics/usage.yml | 34 ++++ 14 files changed, 690 insertions(+), 5 deletions(-) create mode 100644 docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java create mode 100644 x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java create mode 100644 x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java create mode 100644 x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml diff --git a/docs/reference/aggregations/pipeline.asciidoc b/docs/reference/aggregations/pipeline.asciidoc index edaa4c7255913..86b29c7eaaa62 100644 --- a/docs/reference/aggregations/pipeline.asciidoc +++ b/docs/reference/aggregations/pipeline.asciidoc @@ -286,3 +286,4 @@ include::pipeline/bucket-script-aggregation.asciidoc[] include::pipeline/bucket-selector-aggregation.asciidoc[] include::pipeline/bucket-sort-aggregation.asciidoc[] include::pipeline/serial-diff-aggregation.asciidoc[] +include::pipeline/normalize-aggregation.asciidoc[] diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc new file mode 100644 index 0000000000000..631a12dc2c97e --- /dev/null +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -0,0 +1,119 @@ +[[search-aggregations-pipeline-normalize-aggregation]] +=== Normalize Aggregation + +A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value. + +==== Syntax + +A `normalize` aggregation looks like this in isolation: + +[source,js] +-------------------------------------------------- +{ + "normalize": { + "buckets_path": "normalized", + "normalizer": "percent" + } +} +-------------------------------------------------- +// NOTCONSOLE + +[[normalizer_pipeline-params]] +.`normalizer_pipeline` Parameters +[options="header"] +|=== +|Parameter Name |Description |Required |Default Value +|`buckets_path` |The path to the buckets we wish to normalize (see <> for more details) |Required | +|`normalizer` | The specific rescaling to apply | Required | +|`format` |format to apply to the output value of this aggregation |Optional |`null` +|=== + +The following snippet calculates the percent of total sales for each month: + +[source,console] +-------------------------------------------------- +POST /sales/_search +{ + "size": 0, + "aggs" : { + "sales_per_month" : { + "date_histogram" : { + "field" : "date", + "calendar_interval" : "month" + }, + "aggs": { + "sales": { + "sum": { + "field": "price" + } + }, + "percent_of_total_sales": { + "normalize": { + "buckets_path": "sales", <1> + "normalizer": "percent" <2> + } + } + } + } + } +} +-------------------------------------------------- +// TEST[setup:sales] + +<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling +<2> `normalizer` sets which rescaling to apply. In this case, `percent` will calculate the sales value as a percent of all sales + in the parent bucket + +And the following may be the response: + +[source,console-result] +-------------------------------------------------- +{ + "took": 11, + "timed_out": false, + "_shards": ..., + "hits": ..., + "aggregations": { + "sales_per_month": { + "buckets": [ + { + "key_as_string": "2015/01/01 00:00:00", + "key": 1420070400000, + "doc_count": 3, + "sales": { + "value": 550.0 + }, + "percent_of_total_sales": { + "value": 0.5583756345177665 + } + }, + { + "key_as_string": "2015/02/01 00:00:00", + "key": 1422748800000, + "doc_count": 2, + "sales": { + "value": 60.0 + }, + "percent_of_total_sales": { + "value": 0.06091370558375635 + } + }, + { + "key_as_string": "2015/03/01 00:00:00", + "key": 1425168000000, + "doc_count": 2, + "sales": { + "value": 375.0 + }, + "percent_of_total_sales": { + "value": 0.38071065989847713 + } + } + ] + } + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"took": 11/"took": $body.took/] +// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/] +// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/] diff --git a/docs/reference/rest-api/usage.asciidoc b/docs/reference/rest-api/usage.asciidoc index f3e88a9199e58..4db1001cff4f6 100644 --- a/docs/reference/rest-api/usage.asciidoc +++ b/docs/reference/rest-api/usage.asciidoc @@ -267,6 +267,7 @@ GET /_xpack/usage "stats": { "boxplot_usage" : 0, "top_metrics_usage" : 0, + "normalize_usage" : 0, "cumulative_cardinality_usage" : 0, "t_test_usage" : 0, "string_stats_usage" : 0 diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java index 71bf9450f03e5..2edfea344476f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/InternalSimpleValue.java @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl public static final String NAME = "simple_value"; protected final double value; - InternalSimpleValue(String name, double value, DocValueFormat formatter, Map metadata) { + public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map metadata) { super(name, metadata); this.format = formatter; this.value = value; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java index 3a87bb86173f7..40e58ca610049 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/AnalyticsPlugin.java @@ -32,6 +32,7 @@ import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction; import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction; import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory; +import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder; import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot; import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder; @@ -71,13 +72,18 @@ public AnalyticsPlugin() { } @Override public List getPipelineAggregations() { - return singletonList( + return Arrays.asList( new PipelineAggregationSpec( CumulativeCardinalityPipelineAggregationBuilder.NAME, CumulativeCardinalityPipelineAggregationBuilder::new, usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY, - checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))) - ); + checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))), + new PipelineAggregationSpec( + NormalizePipelineAggregationBuilder.NAME, + NormalizePipelineAggregationBuilder::new, + usage.track(AnalyticsStatsAction.Item.NORMALIZE, + checkLicense(NormalizePipelineAggregationBuilder.PARSER)) + )); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java new file mode 100644 index 0000000000000..561c3bf0ffa3d --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Mean; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Percent; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOne; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOneHundred; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Softmax; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.ZScore; + +public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { + public static final String NAME = "normalize"; + static final ParseField NORMALIZER_FIELD = new ParseField("normalizer"); + + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0], + (String) args[1], (List) args[2])); + + static { + PARSER.declareString(optionalConstructorArg(), FORMAT); + PARSER.declareString(constructorArg(), NORMALIZER_FIELD); + PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); + } + + static final Map, NormalizePipelineNormalizer>> NAME_MAP = Map.of( + RescaleZeroToOne.NAME, RescaleZeroToOne::new, + RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new, + Mean.NAME, Mean::new, + ZScore.NAME, ZScore::new, + Percent.NAME, Percent::new, + Softmax.NAME, Softmax::new + ); + + static String validateNormalizerName(String name) { + if (NAME_MAP.containsKey(name)) { + return name; + } + + throw new IllegalArgumentException("invalid normalizer [" + name + "]"); + } + + private final String format; + private final String normalizer; + + + NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List bucketsPath) { + super(name, NAME, bucketsPath.toArray(new String[0])); + this.format = format; + this.normalizer = validateNormalizerName(normalizer); + } + + NormalizePipelineAggregationBuilder(String name, String format, String normalizer, String bucketsPath) { + super(name, NAME, new String[] { bucketsPath }); + this.format = format; + this.normalizer = validateNormalizerName(normalizer); + } + + /** + * Read from a stream. + */ + public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException { + super(in, NAME); + format = in.readOptionalString(); + normalizer = in.readString(); + } + + @Override + protected final void doWriteTo(StreamOutput out) throws IOException { + out.writeOptionalString(format); + out.writeString(normalizer); + } + + /** + * Gets the format to use on the output of this aggregation. + */ + public String format() { + return format; + } + + protected DocValueFormat formatter() { + if (format != null) { + return new DocValueFormat.Decimal(format); + } else { + return DocValueFormat.RAW; + } + } + + @Override + protected PipelineAggregator createInternal(Map metadata) { + return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(normalizer), metadata); + } + + @Override + protected void validate(ValidationContext context) { + if (bucketsPaths.length != 1) { + context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); + } + context.validateParentAggSequentiallyOrdered(NAME, name); + } + + @Override + protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException { + if (format != null) { + builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format); + } + builder.field(NORMALIZER_FIELD.getPreferredName(), normalizer); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), format); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + if (super.equals(obj) == false) return false; + NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj; + return Objects.equals(format, other.format); + } + + @Override + public String getWriteableName() { + return NAME; + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java new file mode 100644 index 0000000000000..bb38bf0b24370 --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; +import org.elasticsearch.search.aggregations.InternalAggregations; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; +import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; +import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; +import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; + +public class NormalizePipelineAggregator extends PipelineAggregator { + private final DocValueFormat formatter; + private final Function, NormalizePipelineNormalizer> normalizerSupplier; + + NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, + Function, NormalizePipelineNormalizer> normalizerSupplier, + Map metadata) { + super(name, bucketsPaths, metadata); + this.formatter = formatter; + this.normalizerSupplier = normalizerSupplier; + } + + @Override + public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { + InternalMultiBucketAggregation + histo = (InternalMultiBucketAggregation) aggregation; + List buckets = histo.getBuckets(); + HistogramFactory factory = (HistogramFactory) histo; + List newBuckets = new ArrayList<>(buckets.size()); + + List values = buckets.stream().map(bucket -> resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.SKIP)) + .collect(Collectors.toList()); + + NormalizePipelineNormalizer normalizer = normalizerSupplier.apply(values); + + for (int i = 0; i < buckets.size(); i++) { + InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i); + Double thisBucketValue = values.get(i); + + final double normalizedBucketValue; + + // Only account for finite values + if (thisBucketValue.isNaN()) { + normalizedBucketValue = Double.NaN; + } else { + normalizedBucketValue = normalizer.normalize(thisBucketValue); + } + + List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) + .map((p) -> (InternalAggregation) p) + .collect(Collectors.toList()); + aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata())); + Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs)); + newBuckets.add(newBucket); + } + + return factory.createAggregation(newBuckets); + } +} diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java new file mode 100644 index 0000000000000..1fd05c589243c --- /dev/null +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + + +import java.util.List; + +abstract class NormalizePipelineNormalizer { + + static class RescaleZeroToOne extends SinglePassSimpleStatisticsNormalizer { + static final String NAME = "rescale_0_1"; + + RescaleZeroToOne(List values) { + super(values); + } + + @Override + double normalize(double value) { + return (value - min) / (max - min); + } + } + + static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsNormalizer { + static final String NAME = "rescale_0_100"; + + RescaleZeroToOneHundred(List values) { + super(values); + } + + @Override + double normalize(double value) { + return 100 * (value - min) / (max - min); + } + } + + static class Mean extends SinglePassSimpleStatisticsNormalizer { + static final String NAME = "mean"; + + Mean(List values) { + super(values); + } + + @Override + double normalize(double value) { + return (value - mean) / (max - min); + } + } + + static class Percent extends SinglePassSimpleStatisticsNormalizer { + static final String NAME = "percent"; + + Percent(List values) { + super(values); + } + + @Override + double normalize(double value) { + return value / sum; + } + } + + static class ZScore extends SinglePassSimpleStatisticsNormalizer { + static final String NAME = "z-score"; + + private final double stdev; + + ZScore(List values) { + super(values); + double variance = 0.0; + for (Double value : values) { + if (value.isNaN() == false) { + variance += Math.pow(value - mean, 2); + } + } + this.stdev = Math.sqrt(variance / count); + } + + @Override + double normalize(double value) { + return (value - mean) / stdev; + } + } + + static class Softmax extends NormalizePipelineNormalizer { + static final String NAME = "softmax"; + + private double sumExp; + + Softmax(List values) { + double sumExp = 0.0; + for (Double value : values) { + if (value.isNaN() == false) { + sumExp += Math.exp(value); + } + } + + this.sumExp = sumExp; + } + + @Override + double normalize(double value) { + return Math.exp(value) / sumExp; + } + } + + abstract double normalize(double value); + + abstract static class SinglePassSimpleStatisticsNormalizer extends NormalizePipelineNormalizer { + protected final double max; + protected final double min; + protected final double sum; + protected final double mean; + protected final int count; + + SinglePassSimpleStatisticsNormalizer(List values) { + int count = 0; + double sum = 0.0; + double min = Double.MAX_VALUE; + double max = Double.MIN_VALUE; + for (Double value : values) { + if (value.isNaN() == false) { + count += 1; + min = Math.min(value, min); + max = Math.max(value, max); + sum += value; + } + } + + this.count = count; + this.min = min; + this.max = max; + this.sum = sum; + this.mean = this.count == 0 ? Double.NaN : this.sum / this.count; + } + } +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java index d1c03e373a67d..eca2417bb1b90 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/action/AnalyticsStatsActionNodeResponseTests.java @@ -43,6 +43,7 @@ public void testItemEnum() { assertThat(AnalyticsStatsAction.Item.STRING_STATS.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.TOP_METRICS.ordinal(), equalTo(i++)); assertThat(AnalyticsStatsAction.Item.T_TEST.ordinal(), equalTo(i++)); + assertThat(AnalyticsStatsAction.Item.NORMALIZE.ordinal(), equalTo(i++)); // Please add tests for newly added items here assertThat(AnalyticsStatsAction.Item.values().length, equalTo(i)); } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java new file mode 100644 index 0000000000000..d6ec78da3412f --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.search.aggregations.AggregatorTestCase; + +public class NormalizeAggregatorTests extends AggregatorTestCase { + +} diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java new file mode 100644 index 0000000000000..cd0d4a9cb488b --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.plugins.SearchPlugin; +import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; + +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.hamcrest.Matchers.equalTo; + +public class NormalizeTests extends BasePipelineAggregationTestCase { + + @Override + protected List plugins() { + return singletonList(new SearchPlugin() { + @Override + public List getPipelineAggregations() { + return singletonList(new PipelineAggregationSpec( + NormalizePipelineAggregationBuilder.NAME, + NormalizePipelineAggregationBuilder::new, + NormalizePipelineAggregationBuilder.PARSER)); + } + }); + } + + public void testInvalidNormalizer() { + NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory(); + String invalidNormalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()) + randomAlphaOfLength(10); + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, + () -> new NormalizePipelineAggregationBuilder(builder.getName(), builder.format(), invalidNormalizer, + List.of(builder.getBucketsPaths()))); + assertThat(exception.getMessage(), equalTo("invalid normalizer [" + invalidNormalizer + "]")); + } + + @Override + protected NormalizePipelineAggregationBuilder createTestAggregatorFactory() { + String name = randomAlphaOfLengthBetween(3, 20); + String bucketsPath = randomAlphaOfLengthBetween(3, 20); + String format = null; + if (randomBoolean()) { + format = randomAlphaOfLengthBetween(1, 10); + } + String normalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()); + return new NormalizePipelineAggregationBuilder(name, format, normalizer, bucketsPath); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java index f0822e17d04cf..305636e7ddb53 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/analytics/action/AnalyticsStatsAction.java @@ -43,7 +43,8 @@ public enum Item { CUMULATIVE_CARDINALITY, STRING_STATS, TOP_METRICS, - T_TEST; + T_TEST, + NORMALIZE; } public static class Request extends BaseNodesRequest implements ToXContentObject { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml new file mode 100644 index 0000000000000..fc4a54e4011a0 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml @@ -0,0 +1,86 @@ +setup: + - skip: + features: headers + - do: + indices.create: + index: foo + body: + mappings: + properties: + timestamp: + type: date + user: + type: keyword + + + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + bulk: + refresh: true + body: + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-01T05:00:00Z" + user: "c" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "a" + + - index: + _index: "foo" + - timestamp: "2017-01-02T05:00:00Z" + user: "b" + + - index: + _index: "foo" + - timestamp: "2017-01-03T05:00:00Z" + user: "d" + +--- +"Basic Search": + + - do: + search: + index: "foo" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + calendar_interval: "day" + aggs: + sum_users: + sum: + field: "user" + total_users: + cumulative_cardinality: + buckets_path: "sum_users" + + - length: { aggregations.histo.buckets: 3 } + - match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } + - match: { aggregations.histo.buckets.0.doc_count: 3 } + - match: { aggregations.histo.buckets.0.distinct_users.value: 3 } + - match: { aggregations.histo.buckets.0.total_users.value: 3 } + - match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } + - match: { aggregations.histo.buckets.1.doc_count: 2 } + - match: { aggregations.histo.buckets.1.distinct_users.value: 2 } + - match: { aggregations.histo.buckets.1.total_users.value: 3 } + - match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } + - match: { aggregations.histo.buckets.2.doc_count: 1 } + - match: { aggregations.histo.buckets.2.distinct_users.value: 1 } + - match: { aggregations.histo.buckets.2.total_users.value: 4 } + diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml index 64b316fbc716b..f161a3e8965bb 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml @@ -26,6 +26,7 @@ setup: - set: {analytics.stats.cumulative_cardinality_usage: cumulative_cardinality_usage} - set: {analytics.stats.t_test_usage: t_test_usage} - set: {analytics.stats.string_stats_usage: string_stats_usage} + - set: {analytics.stats.normalize_usage: normalize_usage} # use boxplot agg - do: @@ -156,3 +157,36 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - gt: { analytics.stats.string_stats_usage: $string_stats_usage } - set: {analytics.stats.string_stats_usage: string_stats_usage} + + + # use normalize agg + - do: + search: + index: "test" + body: + size: 0 + aggs: + histo: + date_histogram: + field: "timestamp" + calendar_interval: "day" + aggs: + distinct_s: + sum: + field: "s" + total_users: + normalize: + buckets_path: "percentage_s" + + - length: { aggregations.histo.buckets: 1 } + + - do: {xpack.usage: {}} + - match: { analytics.available: true } + - match: { analytics.enabled: true } + - match: {analytics.stats.boxplot_usage: $boxplot_usage} + - match: {analytics.stats.top_metrics_usage: $top_metrics_usage} + - match: {analytics.stats.cumulative_cardinality_usage: $cumulative_cardinality_usage} + - match: {analytics.stats.t_test_usage: $t_test_usage} + - match: {analytics.stats.string_stats_usage: $string_stats_usage} + - gt: { analytics.stats.normalize_usage: $normalize_usage } + - set: {analytics.stats.normalize_usage: normalize_usage} From 9782c2a7c583d075733799eedcae5e5c1faac9f4 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Fri, 8 May 2020 19:39:15 -0700 Subject: [PATCH 02/11] moar --- .../pipeline/normalize-aggregation.asciidoc | 8 ++-- .../xcontent/InstantiatingObjectParser.java | 27 ++++++++++- .../NormalizePipelineAggregationBuilder.java | 44 ++++++++---------- .../NormalizePipelineAggregator.java | 16 +++---- ...java => NormalizePipelineNormalizers.java} | 46 ++++++++++--------- .../analytics/normalize/NormalizeTests.java | 3 +- .../test/analytics/normalize.yml | 18 +++----- .../rest-api-spec/test/analytics/usage.yml | 7 +-- 8 files changed, 94 insertions(+), 75 deletions(-) rename x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/{NormalizePipelineNormalizer.java => NormalizePipelineNormalizers.java} (73%) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index 631a12dc2c97e..3432eed3cf833 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -1,3 +1,5 @@ +[role="xpack"] +[testenv="basic"] [[search-aggregations-pipeline-normalize-aggregation]] === Normalize Aggregation @@ -12,7 +14,7 @@ A `normalize` aggregation looks like this in isolation: { "normalize": { "buckets_path": "normalized", - "normalizer": "percent" + "normalizer": "percent_of_sum" } } -------------------------------------------------- @@ -50,7 +52,7 @@ POST /sales/_search "percent_of_total_sales": { "normalize": { "buckets_path": "sales", <1> - "normalizer": "percent" <2> + "normalizer": "percent_of_sum" <2> } } } @@ -61,7 +63,7 @@ POST /sales/_search // TEST[setup:sales] <1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling -<2> `normalizer` sets which rescaling to apply. In this case, `percent` will calculate the sales value as a percent of all sales +<2> `normalizer` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales in the parent bucket And the following may be the response: diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java index 3cb1804ef20d9..94593f47ad79d 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java @@ -20,9 +20,12 @@ package org.elasticsearch.common.xcontent; import org.elasticsearch.common.ParseField; +import org.yaml.snakeyaml.util.ArrayUtils; import java.io.IOException; import java.lang.reflect.Constructor; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -64,6 +67,11 @@ public class InstantiatingObjectParser implements BiFunction, ContextParser { + public static Builder builder(String name, boolean ignoreUnknownFields, + Class valueClass, Object... constructorArgs) { + return new Builder<>(name, ignoreUnknownFields, valueClass, constructorArgs); + } + public static Builder builder(String name, boolean ignoreUnknownFields, Class valueClass) { return new Builder<>(name, ignoreUnknownFields, valueClass); } @@ -80,6 +88,8 @@ public static class Builder extends AbstractObjectParser constructor; + private final Object[] constructorArgs; + public Builder(String name, Class valueClass) { this(name, false, valueClass); } @@ -87,12 +97,19 @@ public Builder(String name, Class valueClass) { public Builder(String name, boolean ignoreUnknownFields, Class valueClass) { this.constructingObjectParser = new ConstructingObjectParser<>(name, ignoreUnknownFields, this::build); this.valueClass = valueClass; + this.constructorArgs = new Object[] {}; + } + + public Builder(String name, boolean ignoreUnknownFields, Class valueClass, Object... constructorArgs) { + this.constructingObjectParser = new ConstructingObjectParser<>(name, ignoreUnknownFields, this::build); + this.valueClass = valueClass; + this.constructorArgs = constructorArgs; } @SuppressWarnings("unchecked") public InstantiatingObjectParser build() { Constructor constructor = null; - int neededArguments = constructingObjectParser.getNumberOfFields(); + int neededArguments = constructingObjectParser.getNumberOfFields() + constructorArgs.length; // Try to find an annotated constructor for (Constructor c : valueClass.getConstructors()) { if (c.getAnnotation(ParserConstructor.class) != null) { @@ -172,8 +189,14 @@ private Value build(Object[] args) { throw new IllegalArgumentException("InstantiatingObjectParser for type " + valueClass.getName() + " has to be finalized " + "before the first use"); } + Object[] allArgs = args; + if (constructorArgs.length > 0) { + allArgs = new Object[args.length + constructorArgs.length]; + System.arraycopy(constructorArgs, 0, allArgs, 0, constructorArgs.length); + System.arraycopy(args, 0, allArgs, constructorArgs.length, args.length); + } try { - return constructor.newInstance(args); + return constructor.newInstance(allArgs); } catch (Exception ex) { throw new IllegalArgumentException("Cannot instantiate an object of " + valueClass.getName(), ex); } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java index 561c3bf0ffa3d..7ed2a6f3b968f 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java @@ -9,7 +9,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.InstantiatingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; @@ -20,34 +20,35 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.DoubleUnaryOperator; import java.util.function.Function; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Mean; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Percent; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOne; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOneHundred; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Softmax; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.ZScore; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Mean; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Percent; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOne; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOneHundred; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Softmax; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.ZScore; public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { public static final String NAME = "normalize"; + public static final InstantiatingObjectParser PARSER; static final ParseField NORMALIZER_FIELD = new ParseField("normalizer"); - @SuppressWarnings("unchecked") - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( - NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0], - (String) args[1], (List) args[2])); static { - PARSER.declareString(optionalConstructorArg(), FORMAT); - PARSER.declareString(constructorArg(), NORMALIZER_FIELD); - PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); + InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( + NAME, false, NormalizePipelineAggregationBuilder.class, NAME); + parser.declareString(optionalConstructorArg(), FORMAT); + parser.declareString(constructorArg(), NORMALIZER_FIELD); + parser.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); + PARSER = parser.build(); } - static final Map, NormalizePipelineNormalizer>> NAME_MAP = Map.of( + static final Map> NAME_MAP = Map.of( RescaleZeroToOne.NAME, RescaleZeroToOne::new, RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new, Mean.NAME, Mean::new, @@ -68,18 +69,12 @@ static String validateNormalizerName(String name) { private final String normalizer; - NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List bucketsPath) { + public NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List bucketsPath) { super(name, NAME, bucketsPath.toArray(new String[0])); this.format = format; this.normalizer = validateNormalizerName(normalizer); } - NormalizePipelineAggregationBuilder(String name, String format, String normalizer, String bucketsPath) { - super(name, NAME, new String[] { bucketsPath }); - this.format = format; - this.normalizer = validateNormalizerName(normalizer); - } - /** * Read from a stream. */ @@ -120,7 +115,6 @@ protected void validate(ValidationContext context) { if (bucketsPaths.length != 1) { context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); } - context.validateParentAggSequentiallyOrdered(NAME, name); } @Override @@ -134,7 +128,7 @@ protected final XContentBuilder internalXContent(XContentBuilder builder, Params @Override public int hashCode() { - return Objects.hash(super.hashCode(), format); + return Objects.hash(super.hashCode(), format, normalizer); } @Override @@ -143,7 +137,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) return false; if (super.equals(obj) == false) return false; NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj; - return Objects.equals(format, other.format); + return Objects.equals(format, other.format) && Objects.equals(normalizer, other.normalizer); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java index bb38bf0b24370..fd42c4cc74792 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.DoubleUnaryOperator; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -28,10 +29,10 @@ public class NormalizePipelineAggregator extends PipelineAggregator { private final DocValueFormat formatter; - private final Function, NormalizePipelineNormalizer> normalizerSupplier; + private final Function normalizerSupplier; NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, - Function, NormalizePipelineNormalizer> normalizerSupplier, + Function normalizerSupplier, Map metadata) { super(name, bucketsPaths, metadata); this.formatter = formatter; @@ -47,22 +48,21 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext HistogramFactory factory = (HistogramFactory) histo; List newBuckets = new ArrayList<>(buckets.size()); - List values = buckets.stream().map(bucket -> resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.SKIP)) - .collect(Collectors.toList()); + double[] values = buckets.stream() + .mapToDouble(bucket -> resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray(); - NormalizePipelineNormalizer normalizer = normalizerSupplier.apply(values); + DoubleUnaryOperator normalizer = normalizerSupplier.apply(values); for (int i = 0; i < buckets.size(); i++) { InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i); - Double thisBucketValue = values.get(i); final double normalizedBucketValue; // Only account for finite values - if (thisBucketValue.isNaN()) { + if (Double.isNaN(values[i])) { normalizedBucketValue = Double.NaN; } else { - normalizedBucketValue = normalizer.normalize(thisBucketValue); + normalizedBucketValue = normalizer.applyAsDouble(values[i]); } List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java similarity index 73% rename from x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java rename to x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java index 1fd05c589243c..2282c126425a2 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizer.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java @@ -7,19 +7,22 @@ package org.elasticsearch.xpack.analytics.normalize; -import java.util.List; +import java.util.function.DoubleUnaryOperator; -abstract class NormalizePipelineNormalizer { +class NormalizePipelineNormalizers { + + // never to be instantiated + private NormalizePipelineNormalizers() {} static class RescaleZeroToOne extends SinglePassSimpleStatisticsNormalizer { static final String NAME = "rescale_0_1"; - RescaleZeroToOne(List values) { + RescaleZeroToOne(double[] values) { super(values); } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return (value - min) / (max - min); } } @@ -27,12 +30,12 @@ static class RescaleZeroToOne extends SinglePassSimpleStatisticsNormalizer { static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsNormalizer { static final String NAME = "rescale_0_100"; - RescaleZeroToOneHundred(List values) { + RescaleZeroToOneHundred(double[] values) { super(values); } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return 100 * (value - min) / (max - min); } } @@ -40,25 +43,25 @@ static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsNormalize static class Mean extends SinglePassSimpleStatisticsNormalizer { static final String NAME = "mean"; - Mean(List values) { + Mean(double[] values) { super(values); } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return (value - mean) / (max - min); } } static class Percent extends SinglePassSimpleStatisticsNormalizer { - static final String NAME = "percent"; + static final String NAME = "percent_of_sum"; - Percent(List values) { + Percent(double[] values) { super(values); } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return value / sum; } } @@ -68,7 +71,7 @@ static class ZScore extends SinglePassSimpleStatisticsNormalizer { private final double stdev; - ZScore(List values) { + ZScore(double[] values) { super(values); double variance = 0.0; for (Double value : values) { @@ -80,17 +83,17 @@ static class ZScore extends SinglePassSimpleStatisticsNormalizer { } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return (value - mean) / stdev; } } - static class Softmax extends NormalizePipelineNormalizer { + static class Softmax implements DoubleUnaryOperator { static final String NAME = "softmax"; private double sumExp; - Softmax(List values) { + Softmax(double[] values) { double sumExp = 0.0; for (Double value : values) { if (value.isNaN() == false) { @@ -102,27 +105,26 @@ static class Softmax extends NormalizePipelineNormalizer { } @Override - double normalize(double value) { + public double applyAsDouble(double value) { return Math.exp(value) / sumExp; } } - abstract double normalize(double value); - - abstract static class SinglePassSimpleStatisticsNormalizer extends NormalizePipelineNormalizer { + abstract static class SinglePassSimpleStatisticsNormalizer implements DoubleUnaryOperator { protected final double max; protected final double min; protected final double sum; protected final double mean; protected final int count; - SinglePassSimpleStatisticsNormalizer(List values) { + SinglePassSimpleStatisticsNormalizer(double[] values) { int count = 0; double sum = 0.0; double min = Double.MAX_VALUE; double max = Double.MIN_VALUE; - for (Double value : values) { - if (value.isNaN() == false) { + + for (double value : values) { + if (Double.isNaN(value)) { count += 1; min = Math.min(value, min); max = Math.max(value, max); diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java index cd0d4a9cb488b..4283860fe2b96 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeTests.java @@ -9,6 +9,7 @@ import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.search.aggregations.BasePipelineAggregationTestCase; +import java.util.Collections; import java.util.List; import static java.util.Collections.singletonList; @@ -47,6 +48,6 @@ protected NormalizePipelineAggregationBuilder createTestAggregatorFactory() { format = randomAlphaOfLengthBetween(1, 10); } String normalizer = randomFrom(NormalizePipelineAggregationBuilder.NAME_MAP.keySet()); - return new NormalizePipelineAggregationBuilder(name, format, normalizer, bucketsPath); + return new NormalizePipelineAggregationBuilder(name, format, normalizer, Collections.singletonList(bucketsPath)); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml index fc4a54e4011a0..e689c2a343ef5 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml @@ -14,8 +14,6 @@ setup: - do: - headers: - Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser bulk: refresh: true body: @@ -63,24 +61,22 @@ setup: field: "timestamp" calendar_interval: "day" aggs: - sum_users: - sum: - field: "user" - total_users: - cumulative_cardinality: - buckets_path: "sum_users" + percent_of_total_users: + normalize: + buckets_path: "_count" + normalizer: "percent_of_sum" - length: { aggregations.histo.buckets: 3 } - match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } - match: { aggregations.histo.buckets.0.doc_count: 3 } - - match: { aggregations.histo.buckets.0.distinct_users.value: 3 } + - match: { aggregations.histo.buckets.0.percent_of_total_users.value: 3 } - match: { aggregations.histo.buckets.0.total_users.value: 3 } - match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } - match: { aggregations.histo.buckets.1.doc_count: 2 } - - match: { aggregations.histo.buckets.1.distinct_users.value: 2 } + - match: { aggregations.histo.buckets.1.percent_of_total_users.value: 2 } - match: { aggregations.histo.buckets.1.total_users.value: 3 } - match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } - match: { aggregations.histo.buckets.2.doc_count: 1 } - - match: { aggregations.histo.buckets.2.distinct_users.value: 1 } + - match: { aggregations.histo.buckets.2.percent_of_total_users.value: 1 } - match: { aggregations.histo.buckets.2.total_users.value: 4 } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml index f161a3e8965bb..0dd10eda6d680 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml @@ -171,12 +171,13 @@ setup: field: "timestamp" calendar_interval: "day" aggs: - distinct_s: + total_users: sum: field: "s" - total_users: + percent_of_total_users: normalize: - buckets_path: "percentage_s" + buckets_path: "total_users" + normalizer: "percent_of_sum" - length: { aggregations.histo.buckets: 1 } From 2dae977ba99d64e0617c43394fbc82005c0cd686 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 11 May 2020 17:05:23 -0700 Subject: [PATCH 03/11] respond to rev --- .../NormalizePipelineAggregationBuilder.java | 16 +-- .../NormalizePipelineAggregator.java | 3 +- .../NormalizePipelineNormalizers.java | 2 +- .../normalize/NormalizeAggregatorTests.java | 117 ++++++++++++++++++ .../NormalizePipelineNormalizersTests.java | 86 +++++++++++++ .../test/analytics/normalize.yml | 25 ++-- 6 files changed, 225 insertions(+), 24 deletions(-) create mode 100644 x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizersTests.java diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java index 7ed2a6f3b968f..fd956e504d035 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java @@ -9,7 +9,7 @@ import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.InstantiatingObjectParser; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder; @@ -35,17 +35,17 @@ public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { public static final String NAME = "normalize"; - public static final InstantiatingObjectParser PARSER; static final ParseField NORMALIZER_FIELD = new ParseField("normalizer"); + @SuppressWarnings("unchecked") + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0], + (String) args[1], (List) args[2])); static { - InstantiatingObjectParser.Builder parser = InstantiatingObjectParser.builder( - NAME, false, NormalizePipelineAggregationBuilder.class, NAME); - parser.declareString(optionalConstructorArg(), FORMAT); - parser.declareString(constructorArg(), NORMALIZER_FIELD); - parser.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); - PARSER = parser.build(); + PARSER.declareString(optionalConstructorArg(), FORMAT); + PARSER.declareString(constructorArg(), NORMALIZER_FIELD); + PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); } static final Map> NAME_MAP = Map.of( diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java index fd42c4cc74792..763b11679c528 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -58,7 +58,8 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext final double normalizedBucketValue; - // Only account for finite values + // Only account for valid values. infite-valued buckets were converted to NaNs by + // the time they reach here. if (Double.isNaN(values[i])) { normalizedBucketValue = Double.NaN; } else { diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java index 2282c126425a2..9593d812e81a1 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java @@ -124,7 +124,7 @@ abstract static class SinglePassSimpleStatisticsNormalizer implements DoubleUnar double max = Double.MIN_VALUE; for (double value : values) { - if (Double.isNaN(value)) { + if (Double.isNaN(value) == false) { count += 1; min = Math.min(value, min); max = Math.max(value, max); diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java index d6ec78da3412f..5def7ee01b8c2 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java @@ -6,8 +6,125 @@ package org.elasticsearch.xpack.analytics.normalize; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.RandomIndexWriter; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.time.DateFormatters; +import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregatorTestCase; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; +import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; +import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; +import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; public class NormalizeAggregatorTests extends AggregatorTestCase { + private static final String DATE_FIELD = "date"; + private static final String VALUE_FIELD = "value_field"; + + private static final List datasetTimes = Arrays.asList( + "2017-01-01T01:07:45", //1 + "2017-01-01T03:43:34", //1 + "2017-01-03T04:11:00", //3 + "2017-01-03T05:11:31", //3 + "2017-01-05T08:24:05", //5 + "2017-01-05T13:09:32", //5 + "2017-01-07T13:47:43", //7 + "2017-01-08T16:14:34", //8 + "2017-01-09T17:09:50", //9 + "2017-01-09T22:55:46");//9 + + private static final List datasetValues = Arrays.asList(1,1,42,6,5,0,2,8,30,13); + private static final List percentOfSum = Arrays.asList(0.2,0.0,0.2,0.0,0.2,0.0,0.1,0.1,0.2); + private static final List rescaleOneHundred = Arrays.asList(0.0,Double.NaN,100.0,Double.NaN,6.521739130434782, + Double.NaN,0.0,13.043478260869565,89.1304347826087); + + public void testPercentOfTotalDocCount() throws IOException { + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum", + List.of("_count"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(9, ((Histogram) agg).getBuckets().size()); + List buckets = ((Histogram) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(percentOfSum.get(i))); + } + }); + } + + public void testValueMean() throws IOException { + DateHistogramAggregationBuilder aggBuilder = new DateHistogramAggregationBuilder("histo"); + aggBuilder.calendarInterval(DateHistogramInterval.DAY).field(DATE_FIELD); + aggBuilder.subAggregation(new StatsAggregationBuilder("stats").field(VALUE_FIELD)); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "rescale_0_100", + List.of("stats.sum"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(9, ((Histogram) agg).getBuckets().size()); + List buckets = ((Histogram) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Histogram.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(rescaleOneHundred.get(i))); + } + }); + } + + private void testCase(ValuesSourceAggregationBuilder aggBuilder, Consumer aggAssertion) throws IOException { + Query query = new MatchAllDocsQuery(); + // index date data + try (Directory directory = newDirectory()) { + try (RandomIndexWriter indexWriter = new RandomIndexWriter(random(), directory)) { + Document document = new Document(); + for (int i = 0; i < datasetValues.size(); i++) { + if (frequently()) { + indexWriter.commit(); + } + long instant = DateFormatters.from(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parse(datasetTimes.get(i))) + .toInstant().toEpochMilli(); + document.add(new SortedNumericDocValuesField(DATE_FIELD, instant)); + document.add(new NumericDocValuesField(VALUE_FIELD, datasetValues.get(i))); + indexWriter.addDocument(document); + document.clear(); + } + } + + // setup mapping + DateFieldMapper.DateFieldType dateFieldType = new DateFieldMapper.Builder("_name").fieldType(); + dateFieldType.setHasDocValues(true); + dateFieldType.setName(DATE_FIELD); + MappedFieldType valueFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG); + valueFieldType.setHasDocValues(true); + valueFieldType.setName(VALUE_FIELD); + + try (IndexReader indexReader = DirectoryReader.open(directory)) { + IndexSearcher indexSearcher = newSearcher(indexReader, true, true); + InternalAggregation internalAggregation = searchAndReduce(indexSearcher, query, aggBuilder, dateFieldType, valueFieldType); + aggAssertion.accept(internalAggregation); + } + } + } } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizersTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizersTests.java new file mode 100644 index 0000000000000..bd2e2290da143 --- /dev/null +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizersTests.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.analytics.normalize; + +import org.elasticsearch.test.ESTestCase; + +import java.util.function.DoubleUnaryOperator; + +import static org.hamcrest.Matchers.equalTo; + +public class NormalizePipelineNormalizersTests extends ESTestCase { + + private static final double[] DATA = + new double[] { 1, 50, Double.NaN, 8, 10, 4, 3, 0, 10, -10, -4}; + private static final int COUNT = 10; + private static final double MIN = -10; + private static final double MAX = 50; + private static final double SUM = 72; + private static final double MEAN = SUM / COUNT; + + public void testRescaleZeroToOne() { + NormalizePipelineNormalizers.RescaleZeroToOne normalizer = new NormalizePipelineNormalizers.RescaleZeroToOne(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { 0.18333333333333332, 1.0, Double.NaN, 0.3, 0.3333333333333333, 0.23333333333333334, + 0.21666666666666667, 0.16666666666666666, 0.3333333333333333, 0.0, 0.1 }; + assertNormalized(normalizer, normalized); + } + + public void testRescaleZeroToOneHundred() { + NormalizePipelineNormalizers.RescaleZeroToOneHundred normalizer = new NormalizePipelineNormalizers.RescaleZeroToOneHundred(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { 18.333333333333332, 100.0, Double.NaN, 30.0, 33.333333333333336, 23.333333333333332, + 21.666666666666668, 16.666666666666668, 33.333333333333336, 0.0, 10.0 }; + assertNormalized(normalizer, normalized); + } + + public void testMean() { + NormalizePipelineNormalizers.Mean normalizer = new NormalizePipelineNormalizers.Mean(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { -0.10333333333333333, 0.7133333333333333, Double.NaN, 0.01333333333333333, + 0.04666666666666666, -0.05333333333333334, -0.07, -0.12000000000000001, 0.04666666666666666, + -0.2866666666666667, -0.18666666666666665 }; + assertNormalized(normalizer, normalized); + } + + public void testZScore() { + NormalizePipelineNormalizers.ZScore normalizer = new NormalizePipelineNormalizers.ZScore(DATA); + assertSinglePassStatistics(normalizer); + double[] normalized = new double[] { -0.4012461740749068, 2.7698929436138724, Double.NaN, 0.05177369988063312, + 0.18120794958221595, -0.20709479952253254, -0.27181192437332397, -0.4659632989256982, 0.18120794958221595, + -1.1131345474336123, -0.7248317983288638 }; + assertNormalized(normalizer, normalized); + } + + public void testSoftmax() { + NormalizePipelineNormalizers.Softmax normalizer = new NormalizePipelineNormalizers.Softmax(DATA); + double[] normalized = new double[] { 5.242885663363464E-22, 1.0, Double.NaN, 5.74952226429356E-19, 4.24835425529159E-18, + 1.0530617357553813E-20, 3.8739976286871875E-21, 1.928749847963918E-22, 4.24835425529159E-18, 8.756510762696521E-27, + 3.532628572200807E-24 }; + + assertNormalized(normalizer, normalized); + } + + private void assertSinglePassStatistics(NormalizePipelineNormalizers.SinglePassSimpleStatisticsNormalizer normalizer) { + assertThat(normalizer.min, equalTo(MIN)); + assertThat(normalizer.max, equalTo(MAX)); + assertThat(normalizer.count, equalTo(COUNT)); + assertThat(normalizer.sum, equalTo(SUM)); + assertThat(normalizer.mean, equalTo(MEAN)); + } + + private void assertNormalized(DoubleUnaryOperator op, double[] normalizedData) { + assertThat(normalizedData.length, equalTo(DATA.length)); + for (int i = 0; i < DATA.length; i++) { + if (Double.isNaN(DATA[i])) { + assertTrue(Double.isNaN(normalizedData[i])); + } else { + assertThat(op.applyAsDouble(DATA[i]), equalTo(normalizedData[i])); + } + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml index e689c2a343ef5..4dbf5aa968810 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml @@ -56,7 +56,7 @@ setup: body: size: 0 aggs: - histo: + users_by_day: date_histogram: field: "timestamp" calendar_interval: "day" @@ -66,17 +66,14 @@ setup: buckets_path: "_count" normalizer: "percent_of_sum" - - length: { aggregations.histo.buckets: 3 } - - match: { aggregations.histo.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } - - match: { aggregations.histo.buckets.0.doc_count: 3 } - - match: { aggregations.histo.buckets.0.percent_of_total_users.value: 3 } - - match: { aggregations.histo.buckets.0.total_users.value: 3 } - - match: { aggregations.histo.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } - - match: { aggregations.histo.buckets.1.doc_count: 2 } - - match: { aggregations.histo.buckets.1.percent_of_total_users.value: 2 } - - match: { aggregations.histo.buckets.1.total_users.value: 3 } - - match: { aggregations.histo.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } - - match: { aggregations.histo.buckets.2.doc_count: 1 } - - match: { aggregations.histo.buckets.2.percent_of_total_users.value: 1 } - - match: { aggregations.histo.buckets.2.total_users.value: 4 } + - length: { aggregations.users_by_day.buckets: 3 } + - match: { aggregations.users_by_day.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.0.doc_count: 3 } + - match: { aggregations.users_by_day.buckets.0.percent_of_total_users.value: 0.5 } + - match: { aggregations.users_by_day.buckets.1.key_as_string: "2017-01-02T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.1.doc_count: 2 } + - match: { aggregations.users_by_day.buckets.1.percent_of_total_users.value: 0.3333333333333333 } + - match: { aggregations.users_by_day.buckets.2.key_as_string: "2017-01-03T00:00:00.000Z" } + - match: { aggregations.users_by_day.buckets.2.doc_count: 1 } + - match: { aggregations.users_by_day.buckets.2.percent_of_total_users.value: 0.16666666666666666 } From 9718e01a400b15db41b59bb17d9ee924d5ee0ec1 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Mon, 11 May 2020 22:29:20 -0700 Subject: [PATCH 04/11] revert change --- .../xcontent/InstantiatingObjectParser.java | 27 ++----------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java index 94593f47ad79d..3cb1804ef20d9 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java +++ b/libs/x-content/src/main/java/org/elasticsearch/common/xcontent/InstantiatingObjectParser.java @@ -20,12 +20,9 @@ package org.elasticsearch.common.xcontent; import org.elasticsearch.common.ParseField; -import org.yaml.snakeyaml.util.ArrayUtils; import java.io.IOException; import java.lang.reflect.Constructor; -import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -67,11 +64,6 @@ public class InstantiatingObjectParser implements BiFunction, ContextParser { - public static Builder builder(String name, boolean ignoreUnknownFields, - Class valueClass, Object... constructorArgs) { - return new Builder<>(name, ignoreUnknownFields, valueClass, constructorArgs); - } - public static Builder builder(String name, boolean ignoreUnknownFields, Class valueClass) { return new Builder<>(name, ignoreUnknownFields, valueClass); } @@ -88,8 +80,6 @@ public static class Builder extends AbstractObjectParser constructor; - private final Object[] constructorArgs; - public Builder(String name, Class valueClass) { this(name, false, valueClass); } @@ -97,19 +87,12 @@ public Builder(String name, Class valueClass) { public Builder(String name, boolean ignoreUnknownFields, Class valueClass) { this.constructingObjectParser = new ConstructingObjectParser<>(name, ignoreUnknownFields, this::build); this.valueClass = valueClass; - this.constructorArgs = new Object[] {}; - } - - public Builder(String name, boolean ignoreUnknownFields, Class valueClass, Object... constructorArgs) { - this.constructingObjectParser = new ConstructingObjectParser<>(name, ignoreUnknownFields, this::build); - this.valueClass = valueClass; - this.constructorArgs = constructorArgs; } @SuppressWarnings("unchecked") public InstantiatingObjectParser build() { Constructor constructor = null; - int neededArguments = constructingObjectParser.getNumberOfFields() + constructorArgs.length; + int neededArguments = constructingObjectParser.getNumberOfFields(); // Try to find an annotated constructor for (Constructor c : valueClass.getConstructors()) { if (c.getAnnotation(ParserConstructor.class) != null) { @@ -189,14 +172,8 @@ private Value build(Object[] args) { throw new IllegalArgumentException("InstantiatingObjectParser for type " + valueClass.getName() + " has to be finalized " + "before the first use"); } - Object[] allArgs = args; - if (constructorArgs.length > 0) { - allArgs = new Object[args.length + constructorArgs.length]; - System.arraycopy(constructorArgs, 0, allArgs, 0, constructorArgs.length); - System.arraycopy(args, 0, allArgs, constructorArgs.length, args.length); - } try { - return constructor.newInstance(allArgs); + return constructor.newInstance(args); } catch (Exception ex) { throw new IllegalArgumentException("Cannot instantiate an object of " + valueClass.getName(), ex); } From 023c34fab289abb5bce094db492a726308ca8a3b Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 13 May 2020 15:43:38 -0700 Subject: [PATCH 05/11] respond to changes --- .../pipeline/normalize-aggregation.asciidoc | 49 +++++++++++++++++-- .../NormalizePipelineAggregationBuilder.java | 41 ++++++++-------- .../NormalizePipelineAggregator.java | 29 +++++------ ...ers.java => NormalizePipelineMethods.java} | 18 +++---- .../normalize/NormalizeAggregatorTests.java | 46 +++++++++++++++-- ...ava => NormalizePipelineMethodsTests.java} | 14 +++--- .../analytics/normalize/NormalizeTests.java | 11 ++++- .../test/analytics/normalize.yml | 2 +- .../rest-api-spec/test/analytics/usage.yml | 9 +++- 9 files changed, 155 insertions(+), 64 deletions(-) rename x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/{NormalizePipelineNormalizers.java => NormalizePipelineMethods.java} (87%) rename x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/{NormalizePipelineNormalizersTests.java => NormalizePipelineMethodsTests.java} (81%) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index 3432eed3cf833..b8b5fd20495aa 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -4,6 +4,7 @@ === Normalize Aggregation A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value. +Values that cannot be normalized, will be skipped using the <>. ==== Syntax @@ -14,7 +15,7 @@ A `normalize` aggregation looks like this in isolation: { "normalize": { "buckets_path": "normalized", - "normalizer": "percent_of_sum" + "method": "percent_of_sum" } } -------------------------------------------------- @@ -25,11 +26,49 @@ A `normalize` aggregation looks like this in isolation: [options="header"] |=== |Parameter Name |Description |Required |Default Value -|`buckets_path` |The path to the buckets we wish to normalize (see <> for more details) |Required | -|`normalizer` | The specific rescaling to apply | Required | +|`buckets_path` |The path to the buckets we wish to normalize (see <> for more details) |Required | +|`method` | The specific <> to apply | Required | |`format` |format to apply to the output value of this aggregation |Optional |`null` |=== +[[normalizer_pipeline-method]] + +The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use +the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`. + +_rescale\_0\_1_:: + This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized + linearly in-between. + x' = (x - min_x) / (max_x - min_x) + [0, 0, .1111, 1, .1111, .3333] + +_rescale\_0\_100_:: + This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized + linearly in-between. + x' = 100 * (x - min_x) / (max_x - min_x) + [0, 0, 11.11, 100, 11.11, 33.33] + +_percent_of_sum_:: + This method normalizes each value so that it represents a percentage of the total sum it attributes to. + x' = x / sum_x + [5%, 5%, 10%, 50%, 10%, 20%] + + +_mean_:: + This method normalizes such that each value is normalized by how much it differs from the average. + x' = (x - mean_x) / (max_x - min_x) + [4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63] + +_zscore_:: + This method normalizes such that each value represents how far it is from the mean relative to the standard deviation + x' = (x - mean_x) / stdev_x + [-0.68, -0.68, -0.39, 1.94, -0.39, 0.19] + +_softmax_:: + This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values. + x' = e^x / sum_e_x + [2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] + The following snippet calculates the percent of total sales for each month: [source,console] @@ -52,7 +91,7 @@ POST /sales/_search "percent_of_total_sales": { "normalize": { "buckets_path": "sales", <1> - "normalizer": "percent_of_sum" <2> + "method": "percent_of_sum" <2> } } } @@ -63,7 +102,7 @@ POST /sales/_search // TEST[setup:sales] <1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling -<2> `normalizer` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales +<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales in the parent bucket And the following may be the response: diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java index fd956e504d035..3de34d9f19de8 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregationBuilder.java @@ -26,16 +26,16 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Mean; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Percent; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOne; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOneHundred; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Softmax; -import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.ZScore; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax; +import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore; public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder { public static final String NAME = "normalize"; - static final ParseField NORMALIZER_FIELD = new ParseField("normalizer"); + static final ParseField METHOD_FIELD = new ParseField("method"); @SuppressWarnings("unchecked") public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( @@ -44,7 +44,7 @@ public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggrega static { PARSER.declareString(optionalConstructorArg(), FORMAT); - PARSER.declareString(constructorArg(), NORMALIZER_FIELD); + PARSER.declareString(constructorArg(), METHOD_FIELD); PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD); } @@ -57,22 +57,20 @@ public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggrega Softmax.NAME, Softmax::new ); - static String validateNormalizerName(String name) { + static String validateMethodName(String name) { if (NAME_MAP.containsKey(name)) { return name; } - - throw new IllegalArgumentException("invalid normalizer [" + name + "]"); + throw new IllegalArgumentException("invalid method [" + name + "]"); } private final String format; - private final String normalizer; - + private final String method; - public NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List bucketsPath) { + public NormalizePipelineAggregationBuilder(String name, String format, String method, List bucketsPath) { super(name, NAME, bucketsPath.toArray(new String[0])); this.format = format; - this.normalizer = validateNormalizerName(normalizer); + this.method = validateMethodName(method); } /** @@ -81,13 +79,13 @@ public NormalizePipelineAggregationBuilder(String name, String format, String no public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException { super(in, NAME); format = in.readOptionalString(); - normalizer = in.readString(); + method = in.readString(); } @Override protected final void doWriteTo(StreamOutput out) throws IOException { out.writeOptionalString(format); - out.writeString(normalizer); + out.writeString(method); } /** @@ -107,7 +105,7 @@ protected DocValueFormat formatter() { @Override protected PipelineAggregator createInternal(Map metadata) { - return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(normalizer), metadata); + return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata); } @Override @@ -115,6 +113,7 @@ protected void validate(ValidationContext context) { if (bucketsPaths.length != 1) { context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]"); } + context.validateHasParent(NAME, name); } @Override @@ -122,13 +121,13 @@ protected final XContentBuilder internalXContent(XContentBuilder builder, Params if (format != null) { builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format); } - builder.field(NORMALIZER_FIELD.getPreferredName(), normalizer); + builder.field(METHOD_FIELD.getPreferredName(), method); return builder; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), format, normalizer); + return Objects.hash(super.hashCode(), format, method); } @Override @@ -137,7 +136,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) return false; if (super.equals(obj) == false) return false; NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj; - return Objects.equals(format, other.format) && Objects.equals(normalizer, other.normalizer); + return Objects.equals(format, other.format) && Objects.equals(method, other.method); } @Override diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java index 763b11679c528..cf18c49e7c81b 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -11,8 +11,6 @@ import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext; import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; -import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket; -import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory; import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; @@ -29,29 +27,28 @@ public class NormalizePipelineAggregator extends PipelineAggregator { private final DocValueFormat formatter; - private final Function normalizerSupplier; + private final Function methodSupplier; NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter, - Function normalizerSupplier, + Function methodSupplier, Map metadata) { super(name, bucketsPaths, metadata); this.formatter = formatter; - this.normalizerSupplier = normalizerSupplier; + this.methodSupplier = methodSupplier; } @Override + @SuppressWarnings("unchecked") public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) { - InternalMultiBucketAggregation - histo = (InternalMultiBucketAggregation) aggregation; - List buckets = histo.getBuckets(); - HistogramFactory factory = (HistogramFactory) histo; - List newBuckets = new ArrayList<>(buckets.size()); + InternalMultiBucketAggregation originalAgg = + (InternalMultiBucketAggregation) aggregation; + List buckets = originalAgg.getBuckets(); + List newBuckets = new ArrayList<>(buckets.size()); double[] values = buckets.stream() - .mapToDouble(bucket -> resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray(); + .mapToDouble(bucket -> resolveBucketValue(originalAgg, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray(); - DoubleUnaryOperator normalizer = normalizerSupplier.apply(values); + DoubleUnaryOperator method = methodSupplier.apply(values); for (int i = 0; i < buckets.size(); i++) { InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i); @@ -63,17 +60,17 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext if (Double.isNaN(values[i])) { normalizedBucketValue = Double.NaN; } else { - normalizedBucketValue = normalizer.applyAsDouble(values[i]); + normalizedBucketValue = method.applyAsDouble(values[i]); } List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) .map((p) -> (InternalAggregation) p) .collect(Collectors.toList()); aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata())); - Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs)); + InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(new InternalAggregations(aggs), bucket); newBuckets.add(newBucket); } - return factory.createAggregation(newBuckets); + return originalAgg.create(newBuckets); } } diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java similarity index 87% rename from x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java rename to x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java index 9593d812e81a1..62e79d92fa4a5 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineNormalizers.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineMethods.java @@ -9,12 +9,12 @@ import java.util.function.DoubleUnaryOperator; -class NormalizePipelineNormalizers { +class NormalizePipelineMethods { // never to be instantiated - private NormalizePipelineNormalizers() {} + private NormalizePipelineMethods() {} - static class RescaleZeroToOne extends SinglePassSimpleStatisticsNormalizer { + static class RescaleZeroToOne extends SinglePassSimpleStatisticsMethod { static final String NAME = "rescale_0_1"; RescaleZeroToOne(double[] values) { @@ -27,7 +27,7 @@ public double applyAsDouble(double value) { } } - static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsNormalizer { + static class RescaleZeroToOneHundred extends SinglePassSimpleStatisticsMethod { static final String NAME = "rescale_0_100"; RescaleZeroToOneHundred(double[] values) { @@ -40,7 +40,7 @@ public double applyAsDouble(double value) { } } - static class Mean extends SinglePassSimpleStatisticsNormalizer { + static class Mean extends SinglePassSimpleStatisticsMethod { static final String NAME = "mean"; Mean(double[] values) { @@ -53,7 +53,7 @@ public double applyAsDouble(double value) { } } - static class Percent extends SinglePassSimpleStatisticsNormalizer { + static class Percent extends SinglePassSimpleStatisticsMethod { static final String NAME = "percent_of_sum"; Percent(double[] values) { @@ -66,7 +66,7 @@ public double applyAsDouble(double value) { } } - static class ZScore extends SinglePassSimpleStatisticsNormalizer { + static class ZScore extends SinglePassSimpleStatisticsMethod { static final String NAME = "z-score"; private final double stdev; @@ -110,14 +110,14 @@ public double applyAsDouble(double value) { } } - abstract static class SinglePassSimpleStatisticsNormalizer implements DoubleUnaryOperator { + abstract static class SinglePassSimpleStatisticsMethod implements DoubleUnaryOperator { protected final double max; protected final double min; protected final double sum; protected final double mean; protected final int count; - SinglePassSimpleStatisticsNormalizer(double[] values) { + SinglePassSimpleStatisticsMethod(double[] values) { int count = 0; double sum = 0.0; double min = Double.MAX_VALUE; diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java index 5def7ee01b8c2..e415755aad788 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/normalize/NormalizeAggregatorTests.java @@ -9,6 +9,7 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.RandomIndexWriter; @@ -16,8 +17,10 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.time.DateFormatters; import org.elasticsearch.index.mapper.DateFieldMapper; +import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.search.aggregations.AggregatorTestCase; @@ -25,6 +28,8 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.StatsAggregationBuilder; import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue; import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder; @@ -39,6 +44,7 @@ public class NormalizeAggregatorTests extends AggregatorTestCase { private static final String DATE_FIELD = "date"; + private static final String TERM_FIELD = "term"; private static final String VALUE_FIELD = "value_field"; private static final List datasetTimes = Arrays.asList( @@ -52,9 +58,21 @@ public class NormalizeAggregatorTests extends AggregatorTestCase { "2017-01-08T16:14:34", //8 "2017-01-09T17:09:50", //9 "2017-01-09T22:55:46");//9 + private static final List datasetTerms = Arrays.asList( + "a", //1 + "a", //1 + "b", //2 + "b", //2 + "c", //3 + "c", //3 + "d", //4 + "e", //5 + "f", //6 + "f");//6 private static final List datasetValues = Arrays.asList(1,1,42,6,5,0,2,8,30,13); - private static final List percentOfSum = Arrays.asList(0.2,0.0,0.2,0.0,0.2,0.0,0.1,0.1,0.2); + private static final List datePercentOfSum = Arrays.asList(0.2,0.0,0.2,0.0,0.2,0.0,0.1,0.1,0.2); + private static final List termPercentOfSum = Arrays.asList(0.2,0.2,0.2,0.2,0.1,0.1); private static final List rescaleOneHundred = Arrays.asList(0.0,Double.NaN,100.0,Double.NaN,6.521739130434782, Double.NaN,0.0,13.043478260869565,89.1304347826087); @@ -70,7 +88,7 @@ public void testPercentOfTotalDocCount() throws IOException { for (int i = 0; i < buckets.size(); i++) { Histogram.Bucket bucket = buckets.get(i); assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), - equalTo(percentOfSum.get(i))); + equalTo(datePercentOfSum.get(i))); } }); } @@ -93,6 +111,23 @@ public void testValueMean() throws IOException { }); } + public void testTermsAggParent() throws IOException { + TermsAggregationBuilder aggBuilder = new TermsAggregationBuilder("terms").field(TERM_FIELD); + aggBuilder.subAggregation(new NormalizePipelineAggregationBuilder("normalized", null, "percent_of_sum", + List.of("_count"))); + + testCase(aggBuilder, (agg) -> { + assertEquals(6, ((Terms) agg).getBuckets().size()); + List buckets = ((Terms) agg).getBuckets(); + for (int i = 0; i < buckets.size(); i++) { + Terms.Bucket bucket = buckets.get(i); + assertThat(((InternalSimpleValue) (bucket.getAggregations().get("normalized"))).value(), + equalTo(termPercentOfSum.get(i))); + } + }); + + } + private void testCase(ValuesSourceAggregationBuilder aggBuilder, Consumer aggAssertion) throws IOException { Query query = new MatchAllDocsQuery(); // index date data @@ -107,6 +142,7 @@ private void testCase(ValuesSourceAggregationBuilder aggBuilder, Consumer aggBuilder, Consumer new NormalizePipelineAggregationBuilder(builder.getName(), builder.format(), invalidNormalizer, List.of(builder.getBucketsPaths()))); - assertThat(exception.getMessage(), equalTo("invalid normalizer [" + invalidNormalizer + "]")); + assertThat(exception.getMessage(), equalTo("invalid method [" + invalidNormalizer + "]")); + } + + public void testHasParentValidation() { + NormalizePipelineAggregationBuilder builder = createTestAggregatorFactory(); + assertThat(validate(emptyList(), builder), CoreMatchers.equalTo( + "Validation Failed: 1: normalize aggregation [" + builder.getName() + "] must be declared inside" + + " of another aggregation;")); } @Override diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml index 4dbf5aa968810..6c7766a75cfae 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/normalize.yml @@ -64,7 +64,7 @@ setup: percent_of_total_users: normalize: buckets_path: "_count" - normalizer: "percent_of_sum" + method: "percent_of_sum" - length: { aggregations.users_by_day.buckets: 3 } - match: { aggregations.users_by_day.buckets.0.key_as_string: "2017-01-01T00:00:00.000Z" } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml index 9369b9825e2fe..f48f1f75230e4 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/analytics/usage.yml @@ -27,6 +27,7 @@ setup: - set: {analytics.stats.t_test_usage: t_test_usage} - set: {analytics.stats.string_stats_usage: string_stats_usage} - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} + - set: { analytics.stats.normalize_usage: normalize_usage } # use boxplot agg - do: @@ -52,6 +53,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use top_metrics agg - do: @@ -80,6 +82,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use cumulative_cardinality agg - do: @@ -112,6 +115,7 @@ setup: - match: {analytics.stats.t_test_usage: $t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use t-test agg - do: @@ -138,6 +142,7 @@ setup: - set: {analytics.stats.t_test_usage: t_test_usage} - match: {analytics.stats.string_stats_usage: $string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } - do: search: @@ -160,6 +165,7 @@ setup: - gt: { analytics.stats.string_stats_usage: $string_stats_usage } - set: {analytics.stats.string_stats_usage: string_stats_usage} - match: {analytics.stats.moving_percentiles_usage: $moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use moving_percentile agg - do: @@ -193,6 +199,7 @@ setup: - match: {analytics.stats.string_stats_usage: $string_stats_usage} - gt: { analytics.stats.moving_percentiles_usage: $moving_percentiles_usage } - set: {analytics.stats.moving_percentiles_usage: moving_percentiles_usage} + - match: { analytics.stats.normalize_usage: $normalize_usage } # use normalize agg - do: @@ -212,7 +219,7 @@ setup: percent_of_total_users: normalize: buckets_path: "total_users" - normalizer: "percent_of_sum" + method: "percent_of_sum" - length: { aggregations.histo.buckets: 1 } From 8da4960ed6e3adb4279f391168fe9f25a219067f Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 13 May 2020 16:59:39 -0700 Subject: [PATCH 06/11] update docs --- .../pipeline/normalize-aggregation.asciidoc | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index b8b5fd20495aa..6147171a284d9 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -31,42 +31,55 @@ A `normalize` aggregation looks like this in isolation: |`format` |format to apply to the output value of this aggregation |Optional |`null` |=== +==== Methods [[normalizer_pipeline-method]] The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`. -_rescale\_0\_1_:: +_rescale_0_1_:: This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized linearly in-between. +

x' = (x - min_x) / (max_x - min_x) +

[0, 0, .1111, 1, .1111, .3333] -_rescale\_0\_100_:: +_rescale_0_100_:: This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized linearly in-between. +

x' = 100 * (x - min_x) / (max_x - min_x) +

[0, 0, 11.11, 100, 11.11, 33.33] _percent_of_sum_:: This method normalizes each value so that it represents a percentage of the total sum it attributes to. +

x' = x / sum_x +

[5%, 5%, 10%, 50%, 10%, 20%] _mean_:: This method normalizes such that each value is normalized by how much it differs from the average. +

x' = (x - mean_x) / (max_x - min_x) +

[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63] _zscore_:: This method normalizes such that each value represents how far it is from the mean relative to the standard deviation +

x' = (x - mean_x) / stdev_x +

[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19] _softmax_:: This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values. +

x' = e^x / sum_e_x +

[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] The following snippet calculates the percent of total sales for each month: From 5d4737c044ab9ac6802ea2ecab6990fca072b050 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Wed, 13 May 2020 17:09:06 -0700 Subject: [PATCH 07/11] format --- .../pipeline/normalize-aggregation.asciidoc | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index 6147171a284d9..b683b89d9d2f5 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -40,46 +40,46 @@ the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20] _rescale_0_1_:: This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized linearly in-between. -

+ x' = (x - min_x) / (max_x - min_x) -

+ [0, 0, .1111, 1, .1111, .3333] _rescale_0_100_:: This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized linearly in-between. -

+ x' = 100 * (x - min_x) / (max_x - min_x) -

+ [0, 0, 11.11, 100, 11.11, 33.33] _percent_of_sum_:: This method normalizes each value so that it represents a percentage of the total sum it attributes to. -

+ x' = x / sum_x -

+ [5%, 5%, 10%, 50%, 10%, 20%] _mean_:: This method normalizes such that each value is normalized by how much it differs from the average. -

+ x' = (x - mean_x) / (max_x - min_x) -

+ [4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63] _zscore_:: This method normalizes such that each value represents how far it is from the mean relative to the standard deviation -

+ x' = (x - mean_x) / stdev_x -

+ [-0.68, -0.68, -0.39, 1.94, -0.39, 0.19] _softmax_:: This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values. -

+ x' = e^x / sum_e_x -

+ [2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] The following snippet calculates the percent of total sales for each month: From aa9eebca37427677f615c36e58a456d0c60d46dc Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 14 May 2020 09:17:26 -0700 Subject: [PATCH 08/11] touch up --- .../pipeline/normalize-aggregation.asciidoc | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index b683b89d9d2f5..d6eedce728257 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -21,18 +21,18 @@ A `normalize` aggregation looks like this in isolation: -------------------------------------------------- // NOTCONSOLE -[[normalizer_pipeline-params]] -.`normalizer_pipeline` Parameters +[[normalize_pipeline-params]] +.`normalize_pipeline` Parameters [options="header"] |=== |Parameter Name |Description |Required |Default Value |`buckets_path` |The path to the buckets we wish to normalize (see <> for more details) |Required | -|`method` | The specific <> to apply | Required | +|`method` | The specific <> to apply | Required | |`format` |format to apply to the output value of this aggregation |Optional |`null` |=== ==== Methods -[[normalizer_pipeline-method]] +[[normalize_pipeline-method]] The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`. @@ -82,6 +82,9 @@ _softmax_:: [2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18] + +==== Example + The following snippet calculates the percent of total sales for each month: [source,console] From 72f29e71fec4421793887ad9e7a967fac53ce301 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 14 May 2020 10:08:21 -0700 Subject: [PATCH 09/11] use format in example --- .../aggregations/pipeline/normalize-aggregation.asciidoc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index d6eedce728257..faf2fed0b7f78 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -108,6 +108,7 @@ POST /sales/_search "normalize": { "buckets_path": "sales", <1> "method": "percent_of_sum" <2> + "format": "00.00%" <3> } } } @@ -120,6 +121,8 @@ POST /sales/_search <1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling <2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales in the parent bucket +<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100 + and adding a '%' And the following may be the response: @@ -141,7 +144,7 @@ And the following may be the response: "value": 550.0 }, "percent_of_total_sales": { - "value": 0.5583756345177665 + "value": "55.83%" } }, { @@ -152,7 +155,7 @@ And the following may be the response: "value": 60.0 }, "percent_of_total_sales": { - "value": 0.06091370558375635 + "value": "6.09%" } }, { @@ -163,7 +166,7 @@ And the following may be the response: "value": 375.0 }, "percent_of_total_sales": { - "value": 0.38071065989847713 + "value": "38.07%" } } ] From 1e9f01532080b402d65ace333dbbfe23d8a7ec7c Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 14 May 2020 11:55:58 -0700 Subject: [PATCH 10/11] comma --- .../aggregations/pipeline/normalize-aggregation.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index faf2fed0b7f78..ece3c7c231ae7 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -107,7 +107,7 @@ POST /sales/_search "percent_of_total_sales": { "normalize": { "buckets_path": "sales", <1> - "method": "percent_of_sum" <2> + "method": "percent_of_sum", <2> "format": "00.00%" <3> } } From e6db0f9e9436f7560ad8a881b8461480ee3d0dd7 Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Thu, 14 May 2020 12:43:15 -0700 Subject: [PATCH 11/11] final fix --- .../aggregations/pipeline/normalize-aggregation.asciidoc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc index ece3c7c231ae7..adeeadaa1b6f9 100644 --- a/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc +++ b/docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc @@ -144,7 +144,8 @@ And the following may be the response: "value": 550.0 }, "percent_of_total_sales": { - "value": "55.83%" + "value": 0.5583756345177665, + "value_as_string": "55.84%" } }, { @@ -155,7 +156,8 @@ And the following may be the response: "value": 60.0 }, "percent_of_total_sales": { - "value": "6.09%" + "value": 0.06091370558375635, + "value_as_string": "06.09%" } }, { @@ -166,7 +168,8 @@ And the following may be the response: "value": 375.0 }, "percent_of_total_sales": { - "value": "38.07%" + "value": 0.38071065989847713, + "value_as_string": "38.07%" } } ]