Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trendline PPL command #3071

Merged
merged 43 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
494b0a2
WIP: Add trendline PPL command
jduo Oct 12, 2024
c6ec31f
fix missing newline
jduo Oct 14, 2024
9a7b6a4
Optimize running average calculation and use DSL
jduo Oct 14, 2024
f39bef1
spotless
jduo Oct 14, 2024
92657a1
Make implementation preserve child data
jduo Oct 18, 2024
5732ac4
Implement logging and explain for trendline
jduo Oct 18, 2024
ec89a61
Use List#get(0) instead of getFirst() to support older compilers
jduo Oct 21, 2024
a25303d
Tell the project operator about new fields from trendline computations
jduo Oct 21, 2024
e8b0fe2
Add Trendline parser test
jduo Oct 22, 2024
f96a657
Spotless
jduo Oct 23, 2024
ed1670a
LogicalPlan testing
jduo Oct 23, 2024
cb4ea19
Physical plan tests
jduo Oct 25, 2024
fc3027e
Spotless
jduo Oct 25, 2024
a7354c5
Fix explain test failure
jduo Oct 25, 2024
332f5e6
Add integration tests
jduo Oct 26, 2024
289a74f
Add trendline documentation
jduo Oct 27, 2024
1ec45f0
Make null handling consistent with normal aggregation
jduo Oct 27, 2024
bf03883
Make alias mandatory
jduo Oct 27, 2024
94a2164
Remove weighted moving average stub
jduo Oct 27, 2024
0c9dfb0
Add DefaultImplementor and PhysicalPlanNodeVisitor tests
jduo Oct 27, 2024
a5c455c
Propagate type information
jduo Oct 27, 2024
4abbae7
Tweak math to evaluate lazily and reduce division
jduo Oct 28, 2024
db6a08a
Add support for moving averages on datetime types
jduo Oct 28, 2024
ce2d089
Add missing doc link
jduo Oct 29, 2024
0e11421
Fix code coverage gaps
jduo Oct 29, 2024
68ac7ad
Fix docs typo
jduo Oct 29, 2024
048930c
Add missing OpenSearchExecutionProtectorTest
jduo Oct 29, 2024
1dd720a
Include trendline in docs test
jduo Oct 29, 2024
48a93e4
Fix typo drawing example table
jduo Oct 30, 2024
4e2e2c0
Add explain integration test
jduo Oct 31, 2024
54a8569
Fix trendline explain IT test
jduo Oct 31, 2024
aca31bd
Make the alias optional
jduo Oct 31, 2024
50d590a
Add validation on number of data points
jduo Oct 31, 2024
07c7efb
Add sort functionality to trendline
jduo Oct 31, 2024
b60093e
Make docs more consistent with Spark
jduo Oct 31, 2024
25ccada
Fix docs typo in example
jduo Oct 31, 2024
c7b7cb1
Add missed update to AstBuilderTest for sort option
jduo Oct 31, 2024
23917a5
Add test for checking an invalid number of samples
jduo Nov 7, 2024
b01dded
Add Trendline to KeywordsCanBeId
jduo Nov 27, 2024
8da8255
Fix wrong column name in docs
jduo Dec 4, 2024
9f1684f
Fix rebase error in parse
jduo Dec 10, 2024
09bd6aa
Merge branch 'main' into ppl-trendline
acarbonetto Dec 12, 2024
0ff1653
PPL-Trendline: remove unused grammar; clean doc
acarbonetto Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 77 additions & 17 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.DESC;
import static org.opensearch.sql.data.type.ExprCoreType.DATE;
import static org.opensearch.sql.data.type.ExprCoreType.STRUCT;
import static org.opensearch.sql.data.type.ExprCoreType.TIME;
import static org.opensearch.sql.data.type.ExprCoreType.TIMESTAMP;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALOUS;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_ANOMALY_GRADE;
import static org.opensearch.sql.utils.MLCommonsConstants.RCF_SCORE;
Expand All @@ -22,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -62,6 +66,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
Expand Down Expand Up @@ -100,6 +105,7 @@
import org.opensearch.sql.planner.logical.LogicalRemove;
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalTrendline;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.physical.datasource.DataSourceTable;
import org.opensearch.sql.storage.Table;
Expand Down Expand Up @@ -469,23 +475,7 @@ public LogicalPlan visitParse(Parse node, AnalysisContext context) {
@Override
public LogicalPlan visitSort(Sort node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ExpressionReferenceOptimizer optimizer =
new ExpressionReferenceOptimizer(expressionAnalyzer.getRepository(), child);

List<Pair<SortOption, Expression>> sortList =
node.getSortList().stream()
.map(
sortField -> {
var analyzed = expressionAnalyzer.analyze(sortField.getField(), context);
if (analyzed == null) {
throw new UnsupportedOperationException(
String.format("Invalid use of expression %s", sortField.getField()));
}
Expression expression = optimizer.optimize(analyzed, context);
return ImmutablePair.of(analyzeSortOption(sortField.getFieldArgs()), expression);
})
.collect(Collectors.toList());
return new LogicalSort(child, sortList);
return buildSort(child, context, node.getSortList());
}

/** Build {@link LogicalDedupe}. */
Expand Down Expand Up @@ -594,6 +584,55 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

/** Build {@link LogicalTrendline} for Trendline command. */
@Override
public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
final LogicalPlan child = node.getChild().get(0).accept(this, context);

final TypeEnvironment currEnv = context.peek();
final List<Trendline.TrendlineComputation> computations = node.getComputations();
final ImmutableList.Builder<Pair<Trendline.TrendlineComputation, ExprCoreType>>
computationsAndTypes = ImmutableList.builder();
computations.forEach(
computation -> {
final Expression resolvedField =
expressionAnalyzer.analyze(computation.getDataField(), context);
final ExprCoreType averageType;
// Duplicate the semantics of AvgAggregator#create():
// - All numerical types have the DOUBLE type for the moving average.
// - All datetime types have the same datetime type for the moving average.
if (ExprCoreType.numberTypes().contains(resolvedField.type())) {
averageType = ExprCoreType.DOUBLE;
} else {
switch (resolvedField.type()) {
case DATE:
case TIME:
case TIMESTAMP:
averageType = (ExprCoreType) resolvedField.type();
break;
default:
throw new SemanticCheckException(
String.format(
"Invalid field used for trendline computation %s. Source field %s had type"
+ " %s but must be a numerical or datetime field.",
computation.getAlias(),
computation.getDataField().getChild().get(0),
resolvedField.type().typeName()));
}
}
currEnv.define(new Symbol(Namespace.FIELD_NAME, computation.getAlias()), averageType);
computationsAndTypes.add(Pair.of(computation, averageType));
});

if (node.getSortByField().isEmpty()) {
return new LogicalTrendline(child, computationsAndTypes.build());
}

return new LogicalTrendline(
buildSort(child, context, Collections.singletonList(node.getSortByField().get())),
computationsAndTypes.build());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
Expand All @@ -612,6 +651,27 @@ public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext con
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

private LogicalSort buildSort(
LogicalPlan child, AnalysisContext context, List<Field> sortFields) {
ExpressionReferenceOptimizer optimizer =
new ExpressionReferenceOptimizer(expressionAnalyzer.getRepository(), child);

List<Pair<SortOption, Expression>> sortList =
sortFields.stream()
.map(
sortField -> {
var analyzed = expressionAnalyzer.analyze(sortField.getField(), context);
if (analyzed == null) {
throw new UnsupportedOperationException(
String.format("Invalid use of expression %s", sortField.getField()));
}
Expression expression = optimizer.optimize(analyzed, context);
return ImmutablePair.of(analyzeSortOption(sortField.getFieldArgs()), expression);
})
.collect(Collectors.toList());
return new LogicalSort(child, sortList);
}

/**
* The first argument is always "asc", others are optional. Given nullFirst argument, use its
* value. Otherwise just use DEFAULT_ASC/DESC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.ast.tree.Rename;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.Values;

/** AST nodes visitor Defines the traverse path. */
Expand Down Expand Up @@ -110,6 +111,14 @@ public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}

public T visitTrendline(Trendline node, C context) {
return visitChildren(node, context);
}

public T visitTrendlineComputation(Trendline.TrendlineComputation node, C context) {
return visitChildren(node, context);
}

public T visitProject(Project node, C context) {
return visitChildren(node, context);
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.TableFunction;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;

Expand Down Expand Up @@ -466,6 +468,18 @@ public static Limit limit(UnresolvedPlan input, Integer limit, Integer offset) {
return new Limit(limit, offset).attach(input);
}

public static Trendline trendline(
UnresolvedPlan input,
Optional<Field> sortField,
Trendline.TrendlineComputation... computations) {
return new Trendline(sortField, Arrays.asList(computations)).attach(input);
}

public static Trendline.TrendlineComputation computation(
Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) {
return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type);
}

public static Parse parse(
UnresolvedPlan input,
ParseMethod parseMethod,
Expand Down
72 changes: 72 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Trendline.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

@ToString
@Getter
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class Trendline extends UnresolvedPlan {

private UnresolvedPlan child;
private final Optional<Field> sortByField;
private final List<TrendlineComputation> computations;

@Override
public Trendline attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return ImmutableList.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
return visitor.visitTrendline(this, context);
}

@Getter
public static class TrendlineComputation extends UnresolvedExpression {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have this into 2 different files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pulled from the corresponding Spark PR opensearch-spark/#748 though this file has changed in that PR too. I think it makes sense as an inner class here as it is Trendline-specific, though it does get used by both LogicalTrendline and TrendlineOperator.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just compared list of classes with other features we added in past. I don't have a preference there.


private final Integer numberOfDataPoints;
private final Field dataField;
private final String alias;
private final TrendlineType computationType;

public TrendlineComputation(
Integer numberOfDataPoints, Field dataField, String alias, TrendlineType computationType) {
this.numberOfDataPoints = numberOfDataPoints;
this.dataField = dataField;
this.alias = alias;
this.computationType = computationType;
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
return nodeVisitor.visitTrendlineComputation(this, context);
}
}

public enum TrendlineType {
SMA,
WMA
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
}
}
32 changes: 32 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse;
import org.opensearch.sql.executor.ExecutionEngine.ExplainResponseNode;
import org.opensearch.sql.expression.Expression;
Expand All @@ -31,6 +33,7 @@
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.TrendlineOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.TableScanOperator;
Expand Down Expand Up @@ -211,6 +214,21 @@ public ExplainResponseNode visitNested(NestedOperator node, Object context) {
explanNode -> explanNode.setDescription(ImmutableMap.of("nested", node.getFields())));
}

@Override
public ExplainResponseNode visitTrendline(TrendlineOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"computations",
describeTrendlineComputations(
node.getComputations().stream()
.map(Pair::getKey)
.collect(Collectors.toList())))));
}

protected ExplainResponseNode explain(
PhysicalPlan node, Object context, Consumer<ExplainResponseNode> doExplain) {
ExplainResponseNode explainNode = new ExplainResponseNode(getOperatorName(node));
Expand Down Expand Up @@ -245,4 +263,18 @@ private Map<String, Map<String, String>> describeSortList(
"sortOrder", p.getLeft().getSortOrder().toString(),
"nullOrder", p.getLeft().getNullOrder().toString())));
}

private List<Map<String, String>> describeTrendlineComputations(
List<Trendline.TrendlineComputation> computations) {
return computations.stream()
.map(
computation ->
ImmutableMap.of(
"computationType",
computation.getComputationType().name().toLowerCase(Locale.ROOT),
"numberOfDataPoints", computation.getNumberOfDataPoints().toString(),
"dataField", computation.getDataField().getChild().get(0).toString(),
"alias", computation.getAlias()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.sql.planner.logical.LogicalRemove;
import org.opensearch.sql.planner.logical.LogicalRename;
import org.opensearch.sql.planner.logical.LogicalSort;
import org.opensearch.sql.planner.logical.LogicalTrendline;
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.logical.LogicalWindow;
import org.opensearch.sql.planner.physical.AggregationOperator;
Expand All @@ -39,6 +40,7 @@
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.TrendlineOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
Expand Down Expand Up @@ -166,6 +168,11 @@ public PhysicalPlan visitCloseCursor(LogicalCloseCursor node, C context) {
return new CursorCloseOperator(visitChild(node, context));
}

@Override
public PhysicalPlan visitTrendline(LogicalTrendline plan, C context) {
return new TrendlineOperator(visitChild(plan, context), plan.getComputations());
}

// Called when paging query requested without `FROM` clause only
@Override
public PhysicalPlan visitPaginate(LogicalPaginate plan, C context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.tree.RareTopN.CommandType;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.ast.tree.Trendline;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.LiteralExpression;
import org.opensearch.sql.expression.NamedExpression;
Expand Down Expand Up @@ -130,6 +132,11 @@ public static LogicalPlan rareTopN(
return new LogicalRareTopN(input, commandType, noOfResults, Arrays.asList(fields), groupByList);
}

public static LogicalTrendline trendline(
LogicalPlan input, Pair<Trendline.TrendlineComputation, ExprCoreType>... computations) {
return new LogicalTrendline(input, Arrays.asList(computations));
}

@SafeVarargs
public LogicalPlan values(List<LiteralExpression>... values) {
return new LogicalValues(Arrays.asList(values));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public R visitAD(LogicalAD plan, C context) {
return visitNode(plan, context);
}

public R visitTrendline(LogicalTrendline plan, C context) {
return visitNode(plan, context);
}

public R visitPaginate(LogicalPaginate plan, C context) {
return visitNode(plan, context);
}
Expand Down
Loading
Loading