Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MovingFunction pipeline aggregation #25137

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions core/src/main/java/org/elasticsearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristic;
import org.elasticsearch.search.aggregations.bucket.significant.heuristics.SignificanceHeuristicParser;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel;
import org.elasticsearch.search.fetch.FetchSubPhase;
import org.elasticsearch.search.fetch.subphase.highlight.Highlighter;
import org.elasticsearch.search.suggest.Suggester;
Expand Down Expand Up @@ -84,6 +86,13 @@ default List<SearchExtensionSpec<SignificanceHeuristic, SignificanceHeuristicPar
default List<SearchExtensionSpec<MovAvgModel, MovAvgModel.AbstractModelParser>> getMovingAverageModels() {
return emptyList();
}
/**
* The new {@link MovModel}s defined by this plugin. {@linkplain MovModel}s are used by the {@link MovFunctionPipelineAggregator} to
* execute functions on windows of data.
*/
default List<SearchExtensionSpec<MovModel, MovModel.AbstractModelParser>> getMovingFunctionModels() {
return emptyList();
}
/**
* The new {@link FetchSubPhase}s defined by this plugin.
*/
Expand Down
54 changes: 46 additions & 8 deletions core/src/main/java/org/elasticsearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,21 @@
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.derivative.InternalDerivative;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.EwmaModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltLinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.HoltWintersModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.moving.models.EwmaModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltLinearModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.HoltWintersModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.LinearModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MaxModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MedianModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MinModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.SumModel;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregator;
import org.elasticsearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -269,6 +276,8 @@ public class SearchModule {
"significance_heuristic");
private final ParseFieldRegistry<MovAvgModel.AbstractModelParser> movingAverageModelParserRegistry = new ParseFieldRegistry<>(
"moving_avg_model");
private final ParseFieldRegistry<MovModel.AbstractModelParser> movingFunctionModelParserRegistry = new ParseFieldRegistry<>(
"moving_function_model");

private final List<FetchSubPhase> fetchSubPhases = new ArrayList<>();

Expand All @@ -288,6 +297,7 @@ public SearchModule(Settings settings, boolean transportClient, List<SearchPlugi
registerValueFormats();
registerSignificanceHeuristics(plugins);
registerMovingAverageModels(plugins);
registerMovingFunctionModels(plugins);
registerAggregations(plugins);
registerPipelineAggregations(plugins);
registerFetchSubPhases(plugins);
Expand Down Expand Up @@ -324,6 +334,13 @@ public ParseFieldRegistry<MovAvgModel.AbstractModelParser> getMovingAverageModel
return movingAverageModelParserRegistry;
}

/**
* The registry of {@link MovModel}s.
*/
public ParseFieldRegistry<MovModel.AbstractModelParser> getMovingFunctionModelParserRegistry() {
return movingFunctionModelParserRegistry;
}

private void registerAggregations(List<SearchPlugin> plugins) {
registerAggregation(new AggregationSpec(AvgAggregationBuilder.NAME, AvgAggregationBuilder::new, AvgAggregationBuilder::parse)
.addResultReader(InternalAvg::new));
Expand Down Expand Up @@ -481,6 +498,12 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
MovAvgPipelineAggregator::new,
(n, c) -> MovAvgPipelineAggregationBuilder.parse(movingAverageModelParserRegistry, n, c))
/* Uses InternalHistogram for buckets */);
registerPipelineAggregation(new PipelineAggregationSpec(
MovFunctionPipelineAggregationBuilder.NAME,
MovFunctionPipelineAggregationBuilder::new,
MovFunctionPipelineAggregator::new,
(n, c) -> MovFunctionPipelineAggregationBuilder.parse(movingFunctionModelParserRegistry, n, c))
/* Uses InternalHistogram for buckets */);
registerPipelineAggregation(new PipelineAggregationSpec(
CumulativeSumPipelineAggregationBuilder.NAME,
CumulativeSumPipelineAggregationBuilder::new,
Expand Down Expand Up @@ -660,6 +683,21 @@ private void registerMovingAverageModel(SearchExtensionSpec<MovAvgModel, MovAvgM
new NamedWriteableRegistry.Entry(MovAvgModel.class, movAvgModel.getName().getPreferredName(), movAvgModel.getReader()));
}

private void registerMovingFunctionModels(List<SearchPlugin> plugins) {
registerMovingFunctionModel(new SearchExtensionSpec<>(MaxModel.NAME, MaxModel::new, MaxModel.PARSER));
registerMovingFunctionModel(new SearchExtensionSpec<>(MinModel.NAME, MinModel::new, MinModel.PARSER));
registerMovingFunctionModel(new SearchExtensionSpec<>(MedianModel.NAME, MedianModel::new, MedianModel.PARSER));
registerMovingFunctionModel(new SearchExtensionSpec<>(SumModel.NAME, SumModel::new, SumModel.PARSER));

registerFromPlugin(plugins, SearchPlugin::getMovingFunctionModels, this::registerMovingFunctionModel);
}

private void registerMovingFunctionModel(SearchExtensionSpec<MovModel, MovModel.AbstractModelParser> movModel) {
movingFunctionModelParserRegistry.register(movModel.getParser(), movModel.getName());
namedWriteables.add(
new NamedWriteableRegistry.Entry(MovModel.class, movModel.getName().getPreferredName(), movModel.getReader()));
}

private void registerFetchSubPhases(List<SearchPlugin> plugins) {
registerFetchSubPhase(new ExplainFetchSubPhase());
registerFetchSubPhase(new DocValueFieldsFetchSubPhase());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import org.elasticsearch.search.aggregations.pipeline.bucketselector.BucketSelectorPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.cumulativesum.CumulativeSumPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.derivative.DerivativePipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.moving.MovAvgPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.moving.MovFunctionPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.serialdiff.SerialDiffPipelineAggregationBuilder;

import java.util.Map;
Expand Down Expand Up @@ -79,6 +80,10 @@ public static MovAvgPipelineAggregationBuilder movingAvg(String name, String buc
return new MovAvgPipelineAggregationBuilder(name, bucketsPath);
}

public static MovFunctionPipelineAggregationBuilder movingFunction(String name, String bucketsPath, Script script) {
return new MovFunctionPipelineAggregationBuilder(name, bucketsPath, script);
}

public static BucketScriptPipelineAggregationBuilder bucketScript(String name,
Map<String, String> bucketsPathsMap, Script script) {
return new BucketScriptPipelineAggregationBuilder(name, bucketsPathsMap, script);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.search.aggregations.pipeline.movavg;
package org.elasticsearch.search.aggregations.pipeline.moving;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParsingException;
Expand All @@ -35,9 +35,10 @@
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModelBuilder;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.SimpleModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModelBuilder;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.SimpleModel;

import java.io.IOException;
import java.text.ParseException;
Expand All @@ -53,9 +54,6 @@
public class MovAvgPipelineAggregationBuilder extends AbstractPipelineAggregationBuilder<MovAvgPipelineAggregationBuilder> {
public static final String NAME = "moving_avg";

public static final ParseField MODEL = new ParseField("model");
private static final ParseField WINDOW = new ParseField("window");
public static final ParseField SETTINGS = new ParseField("settings");
private static final ParseField PREDICT = new ParseField("predict");
private static final ParseField MINIMIZE = new ParseField("minimize");

Expand Down Expand Up @@ -169,11 +167,11 @@ public int window() {
* @param model
* A MovAvgModel which has been prepopulated with settings
*/
public MovAvgPipelineAggregationBuilder modelBuilder(MovAvgModelBuilder model) {
public MovAvgPipelineAggregationBuilder modelBuilder(MovModelBuilder model) {
if (model == null) {
throw new IllegalArgumentException("[model] must not be null: [" + name + "]");
}
this.model = model.build();
this.model = (MovAvgModel)model.build();
return this;
}

Expand Down Expand Up @@ -292,7 +290,7 @@ protected XContentBuilder internalXContent(XContentBuilder builder, Params param
}
builder.field(GAP_POLICY.getPreferredName(), gapPolicy.getName());
model.toXContent(builder, params);
builder.field(WINDOW.getPreferredName(), window);
builder.field(MovModel.WINDOW.getPreferredName(), window);
if (predict > 0) {
builder.field(PREDICT.getPreferredName(), predict);
}
Expand Down Expand Up @@ -322,7 +320,7 @@ public static MovAvgPipelineAggregationBuilder parse(
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.VALUE_NUMBER) {
if (WINDOW.match(currentFieldName)) {
if (MovModel.WINDOW.match(currentFieldName)) {
window = parser.intValue();
if (window <= 0) {
throw new ParsingException(parser.getTokenLocation(), "[" + currentFieldName + "] value must be a positive, "
Expand All @@ -345,7 +343,7 @@ public static MovAvgPipelineAggregationBuilder parse(
bucketsPaths = new String[] { parser.text() };
} else if (GAP_POLICY.match(currentFieldName)) {
gapPolicy = GapPolicy.parse(context, parser.text(), parser.getTokenLocation());
} else if (MODEL.match(currentFieldName)) {
} else if (MovModel.MODEL.match(currentFieldName)) {
model = parser.text();
} else {
throw new ParsingException(parser.getTokenLocation(),
Expand All @@ -364,7 +362,7 @@ public static MovAvgPipelineAggregationBuilder parse(
"Unknown key for a " + token + " in [" + pipelineAggregatorName + "]: [" + currentFieldName + "].");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (SETTINGS.match(currentFieldName)) {
if (MovModel.SETTINGS.match(currentFieldName)) {
settings = parser.map();
} else {
throw new ParsingException(parser.getTokenLocation(),
Expand Down Expand Up @@ -406,7 +404,7 @@ public static MovAvgPipelineAggregationBuilder parse(
MovAvgModel.AbstractModelParser modelParser = movingAverageMdelParserRegistry.lookup(model, parser.getTokenLocation());
MovAvgModel movAvgModel;
try {
movAvgModel = modelParser.parse(settings, pipelineAggregatorName, factory.window());
movAvgModel = (MovAvgModel) modelParser.parse(settings, pipelineAggregatorName, factory.window());
} catch (ParseException exception) {
throw new ParsingException(parser.getTokenLocation(), "Could not parse settings for model [" + model + "].", exception);
}
Expand Down Expand Up @@ -438,4 +436,4 @@ protected boolean doEquals(Object obj) {
public String getWriteableName() {
return NAME;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.elasticsearch.search.aggregations.pipeline.movavg;
package org.elasticsearch.search.aggregations.pipeline.moving;

import org.elasticsearch.common.collect.EvictingQueue;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -33,7 +33,7 @@
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.pipeline.InternalSimpleValue;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.movavg.models.MovAvgModel;
import org.elasticsearch.search.aggregations.pipeline.moving.models.MovAvgModel;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,6 +124,7 @@ public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext

// Some models (e.g. HoltWinters) have certain preconditions that must be met
if (model.hasValue(values.size())) {

double movavg = model.next(values);

List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false).map((p) -> {
Expand Down
Loading