Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Normalize Pipeline Aggregation #56399

Merged
merged 15 commits into from
May 14, 2020
1 change: 1 addition & 0 deletions docs/reference/aggregations/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,4 @@ include::pipeline/bucket-selector-aggregation.asciidoc[]
include::pipeline/bucket-sort-aggregation.asciidoc[]
include::pipeline/serial-diff-aggregation.asciidoc[]
include::pipeline/moving-percentiles-aggregation.asciidoc[]
include::pipeline/normalize-aggregation.asciidoc[]
121 changes: 121 additions & 0 deletions docs/reference/aggregations/pipeline/normalize-aggregation.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
[role="xpack"]
talevy marked this conversation as resolved.
Show resolved Hide resolved
[testenv="basic"]
[[search-aggregations-pipeline-normalize-aggregation]]
talevy marked this conversation as resolved.
Show resolved Hide resolved
=== Normalize Aggregation

A parent pipeline aggregation which calculates the specific normalized/rescaled value for a specific bucket value.

==== Syntax

A `normalize` aggregation looks like this in isolation:

[source,js]
--------------------------------------------------
{
"normalize": {
"buckets_path": "normalized",
"normalizer": "percent_of_sum"
}
}
--------------------------------------------------
// NOTCONSOLE

[[normalizer_pipeline-params]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make a note somewhere that this pipeline always uses a skip gap policy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call!

.`normalizer_pipeline` Parameters
[options="header"]
|===
|Parameter Name |Description |Required |Default Value
|`buckets_path` |The path to the buckets we wish to normalize (see <<buckets-path-syntax>> for more details) |Required |
|`normalizer` | The specific rescaling to apply | Required |
|`format` |format to apply to the output value of this aggregation |Optional |`null`
|===

The following snippet calculates the percent of total sales for each month:

[source,console]
--------------------------------------------------
POST /sales/_search
{
"size": 0,
"aggs" : {
"sales_per_month" : {
"date_histogram" : {
"field" : "date",
"calendar_interval" : "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
},
"percent_of_total_sales": {
"normalize": {
"buckets_path": "sales", <1>
"normalizer": "percent_of_sum" <2>
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]

<1> `buckets_path` instructs this normalize aggregation to use the output of the `sales` aggregation for rescaling
<2> `normalizer` sets which rescaling to apply. In this case, `percent_of_sum` will calculate the sales value as a percent of all sales
in the parent bucket

And the following may be the response:

[source,console-result]
--------------------------------------------------
{
"took": 11,
"timed_out": false,
"_shards": ...,
"hits": ...,
"aggregations": {
"sales_per_month": {
"buckets": [
{
"key_as_string": "2015/01/01 00:00:00",
"key": 1420070400000,
"doc_count": 3,
"sales": {
"value": 550.0
},
"percent_of_total_sales": {
"value": 0.5583756345177665
}
},
{
"key_as_string": "2015/02/01 00:00:00",
"key": 1422748800000,
"doc_count": 2,
"sales": {
"value": 60.0
},
"percent_of_total_sales": {
"value": 0.06091370558375635
}
},
{
"key_as_string": "2015/03/01 00:00:00",
"key": 1425168000000,
"doc_count": 2,
"sales": {
"value": 375.0
},
"percent_of_total_sales": {
"value": 0.38071065989847713
}
}
]
}
}
}
--------------------------------------------------
// TESTRESPONSE[s/"took": 11/"took": $body.took/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": $body._shards/]
// TESTRESPONSE[s/"hits": \.\.\./"hits": $body.hits/]
1 change: 1 addition & 0 deletions docs/reference/rest-api/usage.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ GET /_xpack/usage
"stats": {
"boxplot_usage" : 0,
"top_metrics_usage" : 0,
"normalize_usage" : 0,
"cumulative_cardinality_usage" : 0,
"t_test_usage" : 0,
"string_stats_usage" : 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
public static final String NAME = "simple_value";
protected final double value;

InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
public InternalSimpleValue(String name, double value, DocValueFormat formatter, Map<String, Object> metadata) {
super(name, metadata);
this.format = formatter;
this.value = value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.analytics.action.AnalyticsUsageTransportAction;
import org.elasticsearch.xpack.analytics.action.TransportAnalyticsStatsAction;
import org.elasticsearch.xpack.analytics.aggregations.metrics.AnalyticsAggregatorFactory;
import org.elasticsearch.xpack.analytics.normalize.NormalizePipelineAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.BoxplotAggregationBuilder;
import org.elasticsearch.xpack.analytics.boxplot.InternalBoxplot;
import org.elasticsearch.xpack.analytics.cumulativecardinality.CumulativeCardinalityPipelineAggregationBuilder;
Expand Down Expand Up @@ -84,6 +85,11 @@ public List<PipelineAggregationSpec> getPipelineAggregations() {
MovingPercentilesPipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.MOVING_PERCENTILES,
checkLicense(MovingPercentilesPipelineAggregationBuilder.PARSER))));
pipelineAggs.add(new PipelineAggregationSpec(
NormalizePipelineAggregationBuilder.NAME,
NormalizePipelineAggregationBuilder::new,
usage.track(AnalyticsStatsAction.Item.NORMALIZE,
checkLicense(NormalizePipelineAggregationBuilder.PARSER))));
return pipelineAggs;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketMetricsParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.Parser.FORMAT;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Mean;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Percent;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOne;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.RescaleZeroToOneHundred;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.Softmax;
import static org.elasticsearch.xpack.analytics.normalize.NormalizePipelineNormalizers.ZScore;

public class NormalizePipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<NormalizePipelineAggregationBuilder> {
public static final String NAME = "normalize";
static final ParseField NORMALIZER_FIELD = new ParseField("normalizer");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine with normalizer, but wanted to also suggest method as a potential param name. No strong opinion though :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wishy washy on the naming here as well, and decided not to fret, but I too have leaned towards method earlier, so I am happy to do so here. especially given the overloading of the term across the stack.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the naming to be method


@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<NormalizePipelineAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out InstantiatingObjectParser!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, I tried changing that parser to work here, but I think it deserves its own change. The InstantiatingObjectParser does not expose the Context in such a way that more constructor arguments can be passed in. I believe this can change, but I'd rather not do that here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

NAME, false, (args, name) -> new NormalizePipelineAggregationBuilder(name, (String) args[0],
(String) args[1], (List<String>) args[2]));

static {
PARSER.declareString(optionalConstructorArg(), FORMAT);
PARSER.declareString(constructorArg(), NORMALIZER_FIELD);
PARSER.declareStringArray(constructorArg(), BUCKETS_PATH_FIELD);
}

static final Map<String, Function<double[], DoubleUnaryOperator>> NAME_MAP = Map.of(
RescaleZeroToOne.NAME, RescaleZeroToOne::new,
RescaleZeroToOneHundred.NAME, RescaleZeroToOneHundred::new,
Mean.NAME, Mean::new,
ZScore.NAME, ZScore::new,
Percent.NAME, Percent::new,
Softmax.NAME, Softmax::new
);

static String validateNormalizerName(String name) {
if (NAME_MAP.containsKey(name)) {
return name;
}

throw new IllegalArgumentException("invalid normalizer [" + name + "]");
}

private final String format;
private final String normalizer;


public NormalizePipelineAggregationBuilder(String name, String format, String normalizer, List<String> bucketsPath) {
super(name, NAME, bucketsPath.toArray(new String[0]));
this.format = format;
this.normalizer = validateNormalizerName(normalizer);
}

/**
* Read from a stream.
*/
public NormalizePipelineAggregationBuilder(StreamInput in) throws IOException {
super(in, NAME);
format = in.readOptionalString();
normalizer = in.readString();
}

@Override
protected final void doWriteTo(StreamOutput out) throws IOException {
out.writeOptionalString(format);
out.writeString(normalizer);
}

/**
* Gets the format to use on the output of this aggregation.
*/
public String format() {
return format;
}

protected DocValueFormat formatter() {
if (format != null) {
return new DocValueFormat.Decimal(format);
} else {
return DocValueFormat.RAW;
}
}

@Override
protected PipelineAggregator createInternal(Map<String, Object> metadata) {
return new NormalizePipelineAggregator(name, bucketsPaths, formatter(), NAME_MAP.get(normalizer), metadata);
}

@Override
protected void validate(ValidationContext context) {
if (bucketsPaths.length != 1) {
context.addBucketPathValidationError("must contain a single entry for aggregation [" + name + "]");
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check context.validateHasParent() to make sure this isn't at the top level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, yes. I wasn't aware of this. thanks for bringing it up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a check and a test for this!


@Override
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(BucketMetricsParser.FORMAT.getPreferredName(), format);
}
builder.field(NORMALIZER_FIELD.getPreferredName(), normalizer);
return builder;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), format, normalizer);
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
NormalizePipelineAggregationBuilder other = (NormalizePipelineAggregationBuilder) obj;
return Objects.equals(format, other.format) && Objects.equals(normalizer, other.normalizer);
}

@Override
public String getWriteableName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.analytics.normalize;

import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramFactory;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;

public class NormalizePipelineAggregator extends PipelineAggregator {
private final DocValueFormat formatter;
private final Function<double[], DoubleUnaryOperator> normalizerSupplier;

NormalizePipelineAggregator(String name, String[] bucketsPaths, DocValueFormat formatter,
Function<double[], DoubleUnaryOperator> normalizerSupplier,
Map<String, Object> metadata) {
super(name, bucketsPaths, metadata);
this.formatter = formatter;
this.normalizerSupplier = normalizerSupplier;
}

@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends InternalMultiBucketAggregation.InternalBucket>
histo = (InternalMultiBucketAggregation<? extends InternalMultiBucketAggregation, ? extends
InternalMultiBucketAggregation.InternalBucket>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
HistogramFactory factory = (HistogramFactory) histo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know if this works with a terms agg as the parent? It feels like it should (e.g. it doesn't require any specific ordering of the buckets, unlike something like a moving avg which needs an ordering).

If we think it should work with terms we should tweak this to not use a HistogramFactory directly. BucketScriptPipelineAggregator has an example of how to generically build buckets from any InternalMultiBucketAggregation (the internal agg can create buckets too, not just the factory).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! I was slightly loose in my interpretation of the HistogramFactory's comment

/** Implemented by histogram aggregations and used by pipeline aggregations to insert buckets. */

Will look at how BucketScript does things and add a test for terms agg!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes! I'm sorry I didn't notice this one!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I've updated to include a test for terms and use a more generic way to make new buckets

List<Bucket> newBuckets = new ArrayList<>(buckets.size());

double[] values = buckets.stream()
.mapToDouble(bucket -> resolveBucketValue(histo, bucket, bucketsPaths()[0], GapPolicy.SKIP)).toArray();

DoubleUnaryOperator normalizer = normalizerSupplier.apply(values);

for (int i = 0; i < buckets.size(); i++) {
InternalMultiBucketAggregation.InternalBucket bucket = buckets.get(i);

final double normalizedBucketValue;

// Only account for valid values. infite-valued buckets were converted to NaNs by
// the time they reach here.
if (Double.isNaN(values[i])) {
normalizedBucketValue = Double.NaN;
} else {
normalizedBucketValue = normalizer.applyAsDouble(values[i]);
}

List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucket.getAggregations().copyResults() does this without so much boiler plate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, that method does not work in this context. I think a more dedicated cleanup for this boilerplate can be tackled outside of this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.map((p) -> (InternalAggregation) p)
.collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), new InternalAggregations(aggs));
newBuckets.add(newBucket);
}

return factory.createAggregation(newBuckets);
}
}
Loading