Skip to content

Commit

Permalink
Add Normalize Pipeline Aggregation
Browse files Browse the repository at this point in the history
This aggregation will perform normalizations of metrics
for a given series of data in the form of bucket values.

The aggregations supports the following normalizations

- rescale 0-1
- rescale 0-100
- percentage of sum
- mean normalization
- z-score normalization
- softmax normalization

To specify which normalization is to be used, it can be specified
in the normalize agg's `normalizer` field.

For example:

```
{
  "normalize": {
    "buckets_path": <>,
    "normalizer": "percent"
  }
}
```

Closes elastic#51005.
  • Loading branch information
talevy committed May 8, 2020
1 parent d6fbbe9 commit 5427899
Show file tree
Hide file tree
Showing 14 changed files with 690 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,4 @@ include::pipeline/bucket-script-aggregation.asciidoc[]
include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]
119 changes: 119 additions & 0 deletions docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
[[search-aggregations-pipeline-normalize-aggregation]]
=== Normalize Aggregation

A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.

==== Syntax

A `normalize` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"normalizer": "percent"
}
}
--------------------------------------------------
// NOTCONSOLE

[[normalizer_pipeline-params]]
.`normalizer_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax>> for more details) |Required |
|`normalizer` | The specific rescaling to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

The following snippet calculates the percent of total sales for each month:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"normalizer": "percent" <2>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `normalizer` sets which rescaling to apply. In this case, `percent` will calculate the sales value as a percent of all sales
in the parent bucket

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
1 change: 1 addition & 0 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ GET /_xpack/usage
"stats": {
"boxplot_usage" : 0,
"top_metrics_usage" : 0,
"normalize_usage" : 0,
"cumulative_cardinality_usage" : 0,
"t_test_usage" : 0,
"string_stats_usage" : 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value";
protected final double value;

InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.format = formatter;
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
Expand Down Expand Up @@ -71,13 +72,18 @@ public AnalyticsPlugin() { }

@Override
public List<PipelineAggregationSpec> getPipelineAggregations() {
return singletonList(
return Arrays.asList(
new PipelineAggregationSpec(
CumulativeCardinalityPipelineAggregationBuilder.NAME,
CumulativeCardinalityPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.CUMULATIVE_CARDINALITY,
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER)))
);
checkLicense(CumulativeCardinalityPipelineAggregationBuilder.PARSER))),
new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))
));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizer.ZScore;

public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField NORMALIZER_FIELD = new ParseField("normalizer");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));

static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), NORMALIZER_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}

static final Map<String, Function<List<Double>, NormalizePipelineNormalizer>> NAME_MAP = Map.of(
RescaleZeroToOne.NAME, RescaleZeroToOne::new,
RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new,
Mean.NAME, Mean::new,
ZScore.NAME, ZScore::new,
Percent.NAME, Percent::new,
Softmax.NAME, Softmax::new
);

static String validateNormalizerName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}

throw new IllegalArgumentException("invalid normalizer [" + name + "]");
}

private final String format;
private final String normalizer;


NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.normalizer = validateNormalizerName(normalizer);
}

NormalizePipelineAggregationBuilder(String name, String format, String normalizer, String bucketsPath) {
super(name, NAME, new String[] { bucketsPath });
this.format = format;
this.normalizer = validateNormalizerName(normalizer);
}

/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
normalizer = in.readString();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(normalizer);
}

/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}

protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(normalizer), metadata);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateParentAggSequentiallyOrdered(NAME, name);
}

@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(NORMALIZER_FIELD.getPreferredName(), normalizer);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format);
}

@Override
public String getWriteableName() {
return NAME;
}
}
Loading

0 comments on commit 5427899

Please sign in to comment.