Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregations: Add moving average aggregation #10024

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountParser;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeParser;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelModule;

import java.util.List;

Expand Down Expand Up @@ -101,6 +103,7 @@ public AggregationModule() {
aggParsers.add(ChildrenParser.class);

reducerParsers.add(DerivativeParser.class);
reducerParsers.add(MovAvgParser.class);
}

/**
Expand Down Expand Up @@ -129,7 +132,7 @@ protected void configure() {

@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new SignificantTermsHeuristicModule());
return ImmutableList.of(new SignificantTermsHeuristicModule(), new MovAvgModelModule());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.elasticsearch.search.aggregations.metrics.valuecount.InternalValueCount;
import org.elasticsearch.search.aggregations.reducers.InternalSimpleValue;
import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgReducer;
import org.elasticsearch.search.aggregations.reducers.movavg.models.TransportMovAvgModelModule;

/**
* A module that registers all the transport streams for the addAggregation
Expand Down Expand Up @@ -108,10 +110,11 @@ protected void configure() {

// Reducers
DerivativeReducer.registerStreams();
MovAvgReducer.registerStreams();
}

@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(new TransportSignificantTermsHeuristicModule());
return ImmutableList.of(new TransportSignificantTermsHeuristicModule(), new TransportMovAvgModelModule());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.search.aggregations.reducers;

import org.elasticsearch.search.aggregations.reducers.derivative.DerivativeBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.MovAvgBuilder;

public final class ReducerBuilders {

Expand All @@ -29,4 +30,8 @@ private ReducerBuilders() {
public static final DerivativeBuilder derivative(String name) {
return new DerivativeBuilder(name);
}

public static final MovAvgBuilder smooth(String name) {
return new MovAvgBuilder(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.reducers.movavg;

import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.reducers.ReducerBuilder;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelBuilder;

import java.io.IOException;

import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;

/**
* A builder to create MovingAvg reducer aggregations
*/
public class MovAvgBuilder extends ReducerBuilder<MovAvgBuilder> {

private String format;
private GapPolicy gapPolicy;
private MovAvgModelBuilder modelBuilder;
private Integer window;

public MovAvgBuilder(String name) {
super(name, MovAvgReducer.TYPE.name());
}

public MovAvgBuilder format(String format) {
this.format = format;
return this;
}

/**
* Defines what should be done when a gap in the series is discovered
*
* @param gapPolicy A GapPolicy enum defining the selected policy
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder gapPolicy(GapPolicy gapPolicy) {
this.gapPolicy = gapPolicy;
return this;
}

/**
* Sets a MovAvgModelBuilder for the Moving Average. The model builder is used to
* define what type of moving average you want to use on the series
*
* @param modelBuilder A MovAvgModelBuilder which has been prepopulated with settings
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder modelBuilder(MovAvgModelBuilder modelBuilder) {
this.modelBuilder = modelBuilder;
return this;
}

/**
* Sets the window size for the moving average. This window will "slide" across the
* series, and the values inside that window will be used to calculate the moving avg value
*
* @param window Size of window
* @return Returns the builder to continue chaining
*/
public MovAvgBuilder window(int window) {
this.window = window;
return this;
}


@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
if (format != null) {
builder.field(MovAvgParser.FORMAT.getPreferredName(), format);
}
if (gapPolicy != null) {
builder.field(MovAvgParser.GAP_POLICY.getPreferredName(), gapPolicy.getName());
}
if (modelBuilder != null) {
modelBuilder.toXContent(builder, params);
}
if (window != null) {
builder.field(MovAvgParser.WINDOW.getPreferredName(), window);
}
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations.reducers.movavg;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.aggregations.reducers.Reducer;
import org.elasticsearch.search.aggregations.reducers.ReducerFactory;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParser;
import org.elasticsearch.search.aggregations.reducers.movavg.models.MovAvgModelParserMapper;
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.search.aggregations.reducers.BucketHelpers.GapPolicy;

public class MovAvgParser implements Reducer.Parser {

public static final ParseField FORMAT = new ParseField("format");
public static final ParseField GAP_POLICY = new ParseField("gap_policy");
public static final ParseField MODEL = new ParseField("model");
public static final ParseField WINDOW = new ParseField("window");
public static final ParseField SETTINGS = new ParseField("settings");

private final MovAvgModelParserMapper movAvgModelParserMapper;

@Inject
public MovAvgParser(MovAvgModelParserMapper movAvgModelParserMapper) {
this.movAvgModelParserMapper = movAvgModelParserMapper;
}

@Override
public String type() {
return MovAvgReducer.TYPE.name();
}

@Override
public ReducerFactory parse(String reducerName, XContentParser parser, SearchContext context) throws IOException {
XContentParser.Token token;
String currentFieldName = null;
String[] bucketsPaths = null;
String format = null;
GapPolicy gapPolicy = GapPolicy.IGNORE;
int window = 5;
Map<String, Object> settings = null;
String model = "simple";

while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (WINDOW.match(currentFieldName)) {
window = parser.intValue();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.VALUE_STRING) {
if (FORMAT.match(currentFieldName)) {
format = parser.text();
} else if (BUCKETS_PATH.match(currentFieldName)) {
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text());
} else if (MODEL.match(currentFieldName)) {
model = parser.text();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (BUCKETS_PATH.match(currentFieldName)) {
List<String> paths = new ArrayList<>();
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
String path = parser.text();
paths.add(path);
}
bucketsPaths = paths.toArray(new String[paths.size()]);
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (SETTINGS.match(currentFieldName)) {
settings = parser.map();
} else {
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + reducerName + "]: ["
+ currentFieldName + "].");
}
} else {
throw new SearchParseException(context, "Unexpected token " + token + " in [" + reducerName + "].");
}
}

if (bucketsPaths == null) {
throw new SearchParseException(context, "Missing required field [" + BUCKETS_PATH.getPreferredName()
+ "] for smooth aggregation [" + reducerName + "]");
}

ValueFormatter formatter = null;
if (format != null) {
formatter = ValueFormat.Patternable.Number.format(format).formatter();
}

MovAvgModelParser modelParser = movAvgModelParserMapper.get(model);
if (modelParser == null) {
throw new SearchParseException(context, "Unknown model [" + model
+ "] specified. Valid options are:" + movAvgModelParserMapper.getAllNames().toString());
}
MovAvgModel movAvgModel = modelParser.parse(settings);


return new MovAvgReducer.Factory(reducerName, bucketsPaths, formatter, gapPolicy, window, movAvgModel);
}


}
Loading