Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge branch 'develop' into use-double-quoted-as-string
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Jan 12, 2021
2 parents 354324a + 9a08770 commit 8a4376e
Show file tree
Hide file tree
Showing 66 changed files with 1,764 additions and 291 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Here is a documentation list with features only available in this improved SQL q
* [Aggregations](./docs/user/dql/aggregations.rst): aggregation over expression and more other features
* [Complex queries](./docs/user/dql/complex.rst)
* Improvement on Subqueries in FROM clause
* [Window functions](./docs/user/dql/window.rst): ranking window function support
* [Window functions](./docs/user/dql/window.rst): ranking and aggregate window function support

To avoid impact on your side, normally you won't see any difference in query response. If you want to check if and why your query falls back to be handled by old SQL engine, please explain your query and check Elasticsearch log for "Request is falling back to old SQL engine due to ...".

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@
import com.amazon.opendistroforelasticsearch.sql.expression.DSL;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AggregationState;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.CaseClause;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.WhenClause;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionRepository;
import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionName;
import com.amazon.opendistroforelasticsearch.sql.expression.window.aggregation.AggregateWindowFunction;
import com.amazon.opendistroforelasticsearch.sql.expression.window.ranking.RankingWindowFunction;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -145,9 +148,12 @@ public Expression visitAggregateFunction(AggregateFunction node, AnalysisContext
Optional<BuiltinFunctionName> builtinFunctionName = BuiltinFunctionName.of(node.getFuncName());
if (builtinFunctionName.isPresent()) {
Expression arg = node.getField().accept(this, context);
return (Aggregator)
repository.compile(
Aggregator aggregator = (Aggregator) repository.compile(
builtinFunctionName.get().getName(), Collections.singletonList(arg));
if (node.getCondition() != null) {
aggregator.condition(analyze(node.getCondition(), context));
}
return aggregator;
} else {
throw new SemanticCheckException("Unsupported aggregation function " + node.getFuncName());
}
Expand All @@ -163,9 +169,15 @@ public Expression visitFunction(Function node, AnalysisContext context) {
return (Expression) repository.compile(functionName, arguments);
}

@SuppressWarnings("unchecked")
@Override
public Expression visitWindowFunction(WindowFunction node, AnalysisContext context) {
return visitFunction(node.getFunction(), context);
Expression expr = node.getFunction().accept(this, context);
// Wrap regular aggregator by aggregate window function to adapt window operator use
if (expr instanceof Aggregator) {
return new AggregateWindowFunction((Aggregator<AggregationState>) expr);
}
return expr;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.expression.FunctionExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.conditional.cases.CaseClause;
Expand Down Expand Up @@ -89,6 +90,14 @@ public Expression visitAggregator(Aggregator<?> node, AnalysisContext context) {
return expressionMap.getOrDefault(node, node);
}

@Override
public Expression visitNamed(NamedExpression node, AnalysisContext context) {
if (expressionMap.containsKey(node)) {
return expressionMap.get(node);
}
return node.getDelegated().accept(this, context);
}

/**
* Implement this because Case/When is not registered in function repository.
*/
Expand Down Expand Up @@ -145,7 +154,7 @@ public Void visitAggregation(LogicalAggregation plan, Void context) {
public Void visitWindow(LogicalWindow plan, Void context) {
Expression windowFunc = plan.getWindowFunction();
expressionMap.put(windowFunc,
new ReferenceExpression(windowFunc.toString(), windowFunc.type()));
new ReferenceExpression(((NamedExpression) windowFunc).getName(), windowFunc.type()));
return visitNode(plan, context);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ public List<NamedExpression> visitAlias(Alias node, AnalysisContext context) {
private Expression referenceIfSymbolDefined(Alias expr,
AnalysisContext context) {
UnresolvedExpression delegatedExpr = expr.getDelegated();
return optimizer.optimize(delegatedExpr.accept(expressionAnalyzer, context), context);

// Pass named expression because expression like window function loses full name
// (OVER clause) and thus depends on name in alias to be replaced correctly
return optimizer.optimize(
DSL.named(
expr.getName(),
delegatedExpr.accept(expressionAnalyzer, context),
expr.getAlias()),
context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.expression.WindowFunction;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort.SortOption;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.window.WindowDefinition;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort;
Expand Down Expand Up @@ -68,19 +69,26 @@ public LogicalPlan analyze(UnresolvedExpression projectItem, AnalysisContext con

@Override
public LogicalPlan visitAlias(Alias node, AnalysisContext context) {
return node.getDelegated().accept(this, context);
}
if (!(node.getDelegated() instanceof WindowFunction)) {
return null;
}

WindowFunction unresolved = (WindowFunction) node.getDelegated();
Expression windowFunction = expressionAnalyzer.analyze(unresolved, context);
List<Expression> partitionByList = analyzePartitionList(unresolved, context);
List<Pair<SortOption, Expression>> sortList = analyzeSortList(unresolved, context);

@Override
public LogicalPlan visitWindowFunction(WindowFunction node, AnalysisContext context) {
Expression windowFunction = expressionAnalyzer.analyze(node, context);
List<Expression> partitionByList = analyzePartitionList(node, context);
List<Pair<SortOption, Expression>> sortList = analyzeSortList(node, context);
WindowDefinition windowDefinition = new WindowDefinition(partitionByList, sortList);
NamedExpression namedWindowFunction =
new NamedExpression(node.getName(), windowFunction, node.getAlias());
List<Pair<SortOption, Expression>> allSortItems = windowDefinition.getAllSortItems();

if (allSortItems.isEmpty()) {
return new LogicalWindow(child, namedWindowFunction, windowDefinition);
}
return new LogicalWindow(
new LogicalSort(child, windowDefinition.getAllSortItems()),
windowFunction,
new LogicalSort(child, allSortItems),
namedWindowFunction,
windowDefinition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,11 @@ public static UnresolvedExpression aggregate(
return new AggregateFunction(func, field, Arrays.asList(args));
}

public static UnresolvedExpression filteredAggregate(
String func, UnresolvedExpression field, UnresolvedExpression condition) {
return new AggregateFunction(func, field, condition);
}

public static Function function(String funcName, UnresolvedExpression... funcArgs) {
return new Function(funcName, Arrays.asList(funcArgs));
}
Expand Down Expand Up @@ -231,7 +236,7 @@ public When when(UnresolvedExpression condition, UnresolvedExpression result) {
return new When(condition, result);
}

public UnresolvedExpression window(Function function,
public UnresolvedExpression window(UnresolvedExpression function,
List<UnresolvedExpression> partitionByList,
List<Pair<SortOption, UnresolvedExpression>> sortList) {
return new WindowFunction(function, partitionByList, sortList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class AggregateFunction extends UnresolvedExpression {
private final String funcName;
private final UnresolvedExpression field;
private final List<UnresolvedExpression> argList;
private UnresolvedExpression condition;

/**
* Constructor.
Expand All @@ -46,6 +47,20 @@ public AggregateFunction(String funcName, UnresolvedExpression field) {
this.argList = Collections.emptyList();
}

/**
* Constructor.
* @param funcName function name.
* @param field {@link UnresolvedExpression}.
* @param condition condition in aggregation filter.
*/
public AggregateFunction(String funcName, UnresolvedExpression field,
UnresolvedExpression condition) {
this.funcName = funcName;
this.field = field;
this.argList = Collections.emptyList();
this.condition = condition;
}

@Override
public List<UnresolvedExpression> getChild() {
return Collections.singletonList(field);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.ast.Node;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort.SortOption;
import java.util.Collections;
import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
Expand All @@ -35,13 +35,17 @@
@ToString
public class WindowFunction extends UnresolvedExpression {

private final Function function;
private final UnresolvedExpression function;
private List<UnresolvedExpression> partitionByList;
private List<Pair<SortOption, UnresolvedExpression>> sortList;

@Override
public List<? extends Node> getChild() {
return Collections.singletonList(function);
ImmutableList.Builder<UnresolvedExpression> children = ImmutableList.builder();
children.add(function);
children.addAll(partitionByList);
sortList.forEach(pair -> children.add(pair.getRight()));
return children.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.amazon.opendistroforelasticsearch.sql.analysis.ExpressionAnalyzer;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils;
import com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType;
import com.amazon.opendistroforelasticsearch.sql.data.type.ExprType;
import com.amazon.opendistroforelasticsearch.sql.exception.ExpressionEvaluationException;
Expand All @@ -30,6 +31,8 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
* Aggregator which will iterate on the {@link BindingTuple}s to aggregate the result.
Expand All @@ -46,20 +49,40 @@ public abstract class Aggregator<S extends AggregationState>
@Getter
private final List<Expression> arguments;
protected final ExprCoreType returnType;
@Setter
@Getter
@Accessors(fluent = true)
protected Expression condition;

/**
* Create an {@link AggregationState} which will be used for aggregation.
*/
public abstract S create();

/**
* Iterate on the {@link BindingTuple}.
* Iterate on {@link ExprValue}.
* @param value {@link ExprValue}
* @param state {@link AggregationState}
* @return {@link AggregationState}
*/
protected abstract S iterate(ExprValue value, S state);

/**
* Let the aggregator iterate on the {@link BindingTuple}
* To filter out ExprValues that are missing, null or cannot satisfy {@link #condition}
* Before the specific aggregator iterating ExprValue in the tuple.
*
* @param tuple {@link BindingTuple}
* @param state {@link AggregationState}
* @return {@link AggregationState}
*/
public abstract S iterate(BindingTuple tuple, S state);
public S iterate(BindingTuple tuple, S state) {
ExprValue value = getArguments().get(0).valueOf(tuple);
if (value.isNull() || value.isMissing() || !conditionValue(tuple)) {
return state;
}
return iterate(value, state);
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
Expand All @@ -77,4 +100,14 @@ public <T, C> T accept(ExpressionNodeVisitor<T, C> visitor, C context) {
return visitor.visitAggregator(this, context);
}

/**
* Util method to get value of condition in aggregation filter.
*/
public boolean conditionValue(BindingTuple tuple) {
if (condition == null) {
return true;
}
return ExprValueUtils.getBooleanValue(condition.valueOf(tuple));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,9 @@ public AvgState create() {
}

@Override
public AvgState iterate(BindingTuple tuple, AvgState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.count++;
state.total += ExprValueUtils.getDoubleValue(value);
}
protected AvgState iterate(ExprValue value, AvgState state) {
state.count++;
state.total += ExprValueUtils.getDoubleValue(value);
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ public CountAggregator.CountState create() {
}

@Override
public CountState iterate(BindingTuple tuple, CountState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.count++;
}
protected CountState iterate(ExprValue value, CountState state) {
state.count++;
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public MaxState create() {
}

@Override
public MaxState iterate(BindingTuple tuple, MaxState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.max(value);
}
protected MaxState iterate(ExprValue value, MaxState state) {
state.max(value);
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,8 @@ public MinState create() {
}

@Override
public MinState iterate(BindingTuple tuple, MinState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.min(value);
}
protected MinState iterate(ExprValue value, MinState state) {
state.min(value);
return state;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.amazon.opendistroforelasticsearch.sql.expression.aggregation;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.storage.bindingtuple.BindingTuple;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -63,8 +64,8 @@ public AggregationState create() {
}

@Override
public AggregationState iterate(BindingTuple tuple, AggregationState state) {
return delegated.iterate(tuple, state);
protected AggregationState iterate(ExprValue value, AggregationState state) {
return delegated.iterate(value, state);
}

/**
Expand Down
Loading

0 comments on commit 8a4376e

Please sign in to comment.