Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Normalize Pipeline Aggregation #56399

Merged
merged 15 commits into from
May 14, 2020
1 change: 1 addition & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]
176 changes: 176 additions & 0 deletions docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
[role="xpack"]
talevy marked this conversation as resolved.
Show resolved Hide resolved
[testenv="basic"]
[[search-aggregations-pipeline-normalize-aggregation]]
talevy marked this conversation as resolved.
Show resolved Hide resolved
=== Normalize Aggregation

A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.
Values that cannot be normalized, will be skipped using the <<gap-policy, skip gap policy>>.

==== Syntax

A `normalize` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"method": "percent_of_sum"
}
}
--------------------------------------------------
// NOTCONSOLE

[[normalize_pipeline-params]]
.`normalize_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax, `buckets_path` syntax>> for more details) |Required |
|`method` | The specific <<normalize_pipeline-method, method>> to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

==== Methods
[[normalize_pipeline-method]]

The Normalize Aggregation supports multiple methods to transform the bucket values. Each method definition will use
the following original set of bucket values as examples: `[5, 5, 10, 50, 10, 20]`.

_rescale_0_1_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = (x - min_x) / (max_x - min_x)

[0, 0, .1111, 1, .1111, .3333]

_rescale_0_100_::
This method rescales the data such that the minimum number is zero, and the maximum number is 1, with the rest normalized
linearly in-between.

x' = 100 * (x - min_x) / (max_x - min_x)

[0, 0, 11.11, 100, 11.11, 33.33]

_percent_of_sum_::
This method normalizes each value so that it represents a percentage of the total sum it attributes to.

x' = x / sum_x

[5%, 5%, 10%, 50%, 10%, 20%]


_mean_::
This method normalizes such that each value is normalized by how much it differs from the average.

x' = (x - mean_x) / (max_x - min_x)

[4.63, 4.63, 9.63, 49.63, 9.63, 9.63, 19.63]

_zscore_::
This method normalizes such that each value represents how far it is from the mean relative to the standard deviation

x' = (x - mean_x) / stdev_x

[-0.68, -0.68, -0.39, 1.94, -0.39, 0.19]

_softmax_::
This method normalizes such that each value is exponentiated and relative to the sum of the exponents of the original values.

x' = e^x / sum_e_x

[2.862E-20, 2.862E-20, 4.248E-18, 0.999, 9.357E-14, 4.248E-18]


==== Example

The following snippet calculates the percent of total sales for each month:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"method": "percent_of_sum" <2>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `method` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales
in the parent bucket

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
1 change: 1 addition & 0 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ GET /_xpack/usage
"stats": {
"boxplot_usage" : 0,
"top_metrics_usage" : 0,
"normalize_usage" : 0,
"cumulative_cardinality_usage" : 0,
"t_test_usage" : 0,
"string_stats_usage" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value";
protected final double value;

InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.format = formatter;
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
Expand Down Expand Up @@ -84,6 +85,11 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineMethods.ZScore;

public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField METHOD_FIELD = new ParseField("method");

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out InstantiatingObjectParser!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, I tried changing that parser to work here, but I think it deserves its own change. The InstantiatingObjectParser does not expose the Context in such a way that more constructor arguments can be passed in. I believe this can change, but I'd rather not do that here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));

static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), METHOD_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}

static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP = Map.of(
RescaleZeroToOne.NAME, RescaleZeroToOne::new,
RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new,
Mean.NAME, Mean::new,
ZScore.NAME, ZScore::new,
Percent.NAME, Percent::new,
Softmax.NAME, Softmax::new
);

static String validateMethodName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}
throw new IllegalArgumentException("invalid method [" + name + "]");
}

private final String format;
private final String method;

public NormalizePipelineAggregationBuilder(String name, String format, String method, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.method = validateMethodName(method);
}

/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
method = in.readString();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(method);
}

/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}

protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(method), metadata);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
context.validateHasParent(NAME, name);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check context.validateHasParent() to make sure this isn't at the top level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes. I wasn't aware of this. thanks for bringing it up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check and a test for this!


@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(METHOD_FIELD.getPreferredName(), method);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format, method);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format) && Objects.equals(method, other.method);
}

@Override
public String getWriteableName() {
return NAME;
}
}
Loading