Skip to content

Commit

Permalink
Add Normalize Pipeline Aggregation (elastic#56399)
Browse files Browse the repository at this point in the history
This aggregation will perform normalizations of metrics
for a given series of data in the form of bucket values.

The aggregations supports the following normalizations

- rescale 0-1
- rescale 0-100
- percentage of sum
- mean normalization
- z-score normalization
- softmax normalization

To specify which normalization is to be used, it can be specified
in the normalize agg's `normalizer` field.

For example:

```
{
  "normalize": {
    "buckets_path": <>,
    "normalizer": "percent"
  }
}
```

Closes elastic#51005.
  • Loading branch information
talevy committed May 14, 2020
1 parent 2a943a5 commit 9ef0eea
Show file tree
Hide file tree
Showing 14 changed files with 1,000 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]
182 changes: 182 additions & 0 deletions docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
[role="xpack"]
[testenv="basic"]
[[search-aggregations-pipeline-normalize-aggregation]]
=== Normalize Aggregation

A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.
Values that cannot be normalized, will be skipped using the <<gap-policy, skip gap policy>>.

==== Syntax

A `normalize` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"method": "percent_of_sum"
}
}
--------------------------------------------------
// NOTCONSOLE

[[normalize_pipeline-params]]
.`normalize_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax, `buckets_path` syntax>> for more details) |Required |
|`method` | The specific <<normalize_pipeline-method, method>> to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

==== Methods
[[normalize_pipeline-method]]

The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use
the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`.

_rescale_0_1_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = (x - min_x) / (max_x - min_x)

[0, 0, .1111, 1, .1111, .3333]

_rescale_0_100_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = 100 * (x - min_x) / (max_x - min_x)

[0, 0, 11.11, 100, 11.11, 33.33]

_percent_of_sum_::
This method normalizes each value so that it represents a percentage of the total sum it attributes to.

x' = x / sum_x

[5%, 5%, 10%, 50%, 10%, 20%]


_mean_::
This method normalizes such that each value is normalized by how much it differs from the average.

x' = (x - mean_x) / (max_x - min_x)

[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63]

_zscore_::
This method normalizes such that each value represents how far it is from the mean relative to the standard deviation

x' = (x - mean_x) / stdev_x

[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19]

_softmax_::
This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values.

x' = e^x / sum_e_x

[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18]


==== Example

The following snippet calculates the percent of total sales for each month:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"method": "percent_of_sum", <2>
"format": "00.00%" <3>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales
in the parent bucket
<3> `format` influences how to format the metric as a string using Java's `DecimalFormat` pattern. In this case, multiplying by 100
and adding a '%'

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665,
"value_as_string": "55.84%"
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635,
"value_as_string": "06.09%"
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713,
"value_as_string": "38.07%"
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value";
protected final double value;

InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.format = formatter;
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
Expand Down Expand Up @@ -84,6 +85,11 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore;

public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField METHOD_FIELD = new ParseField("method");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));

static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), METHOD_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}

static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP;

static {
NAME_MAP = new HashMap<>();
NAME_MAP.put(RescaleZeroToOne.NAME, RescaleZeroToOne::new);
NAME_MAP.put(RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new);
NAME_MAP.put(Mean.NAME, Mean::new);
NAME_MAP.put(ZScore.NAME, ZScore::new);
NAME_MAP.put(Percent.NAME, Percent::new);
NAME_MAP.put(Softmax.NAME, Softmax::new);
}

static String validateMethodName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}
throw new IllegalArgumentException("invalid method [" + name + "]");
}

private final String format;
private final String method;

public NormalizePipelineAggregationBuilder(String name, String format, String method, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.method = validateMethodName(method);
}

/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
method = in.readString();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(method);
}

/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}

protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateHasParent(NAME, name);
}

@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(METHOD_FIELD.getPreferredName(), method);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format, method);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format) && Objects.equals(method, other.method);
}

@Override
public String getWriteableName() {
return NAME;
}
}
Loading

0 comments on commit 9ef0eea

Please sign in to comment.