Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

[PPL] Support eval command #477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: '23.0'
compile group: 'org.springframework', name: 'spring-context', version: '5.2.5.RELEASE'
compile group: 'org.springframework', name: 'spring-beans', version: '5.2.5.RELEASE'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testCompile group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import com.amazon.opendistroforelasticsearch.sql.ast.AbstractNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Argument;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Let;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Aggregation;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Eval;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Filter;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Project;
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Relation;
Expand All @@ -32,6 +34,7 @@
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalAggregation;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalEval;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalFilter;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject;
Expand All @@ -41,104 +44,125 @@
import com.amazon.opendistroforelasticsearch.sql.storage.StorageEngine;
import com.amazon.opendistroforelasticsearch.sql.storage.Table;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

/**
* Analyze the {@link UnresolvedPlan} in the {@link AnalysisContext} to construct the {@link LogicalPlan}
* Analyze the {@link UnresolvedPlan} in the {@link AnalysisContext} to construct the {@link
* LogicalPlan}
*/
@RequiredArgsConstructor
public class Analyzer extends AbstractNodeVisitor<LogicalPlan, AnalysisContext> {
private final ExpressionAnalyzer expressionAnalyzer;
private final StorageEngine storageEngine;
private final ExpressionAnalyzer expressionAnalyzer;
private final StorageEngine storageEngine;

public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
return unresolved.accept(this, context);
}
public LogicalPlan analyze(UnresolvedPlan unresolved, AnalysisContext context) {
return unresolved.accept(this, context);
}

@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
context.push();
TypeEnvironment curEnv = context.peek();
Table table = storageEngine.getTable(node.getTableName());
table.getFieldTypes().forEach((k, v) -> curEnv.define(DSL.ref(k), v));
return new LogicalRelation(node.getTableName());
}
@Override
public LogicalPlan visitRelation(Relation node, AnalysisContext context) {
context.push();
TypeEnvironment curEnv = context.peek();
Table table = storageEngine.getTable(node.getTableName());
table.getFieldTypes().forEach((k, v) -> curEnv.define(DSL.ref(k), v));
return new LogicalRelation(node.getTableName());
}

@Override
public LogicalPlan visitFilter(Filter node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expression condition = expressionAnalyzer.analyze(node.getCondition(), context);
return new LogicalFilter(child, condition);
@Override
public LogicalPlan visitFilter(Filter node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expression condition = expressionAnalyzer.analyze(node.getCondition(), context);
return new LogicalFilter(child, condition);
}

/** Build {@link LogicalRename} */
@Override
public LogicalPlan visitRename(Rename node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> renameMapBuilder =
new ImmutableMap.Builder<>();
for (com.amazon.opendistroforelasticsearch.sql.ast.expression.Map renameMap :
node.getRenameList()) {
Expression origin = expressionAnalyzer.analyze(renameMap.getOrigin(), context);
// We should define the new target field in the context instead of analyze it.
if (renameMap.getTarget() instanceof Field) {
ReferenceExpression target =
new ReferenceExpression(((Field) renameMap.getTarget()).getField().toString());
context.peek().define(target, origin.type(context.peek()));
renameMapBuilder.put(DSL.ref(origin.toString()), target);
} else {
throw new SemanticCheckException(
String.format("the target expected to be field, but is %s", renameMap.getTarget()));
}
}

/**
* Build {@link LogicalRename}
*/
@Override
public LogicalPlan visitRename(Rename node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableMap.Builder<ReferenceExpression, ReferenceExpression> renameMapBuilder = new ImmutableMap.Builder<>();
for (com.amazon.opendistroforelasticsearch.sql.ast.expression.Map renameMap : node.getRenameList()) {
Expression origin = expressionAnalyzer.analyze(renameMap.getOrigin(), context);
// We should define the new target field in the context instead of analyze it.
if (renameMap.getTarget() instanceof Field) {
ReferenceExpression target =
new ReferenceExpression(((Field) renameMap.getTarget()).getField().toString());
context.peek().define(target, origin.type(context.peek()));
renameMapBuilder.put(DSL.ref(origin.toString()), target);
} else {
throw new SemanticCheckException(String.format("the target expected to be field, but is %s",
renameMap.getTarget()));
}
}
return new LogicalRename(child, renameMapBuilder.build());
}

return new LogicalRename(child, renameMapBuilder.build());
/** Build {@link LogicalAggregation} */
@Override
public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableList.Builder<Aggregator> aggregatorBuilder = new ImmutableList.Builder<>();
for (UnresolvedExpression uExpr : node.getAggExprList()) {
aggregatorBuilder.add((Aggregator) expressionAnalyzer.analyze(uExpr, context));
}

/**
* Build {@link LogicalAggregation}
*/
@Override
public LogicalPlan visitAggregation(Aggregation node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableList.Builder<Aggregator> aggregatorBuilder = new ImmutableList.Builder<>();
for (UnresolvedExpression uExpr : node.getAggExprList()) {
aggregatorBuilder.add((Aggregator) expressionAnalyzer.analyze(uExpr, context));
}
ImmutableList.Builder<Expression> groupbyBuilder = new ImmutableList.Builder<>();
for (UnresolvedExpression uExpr : node.getGroupExprList()) {
groupbyBuilder.add(expressionAnalyzer.analyze(uExpr, context));
}
return new LogicalAggregation(child, aggregatorBuilder.build(), groupbyBuilder.build());
}

ImmutableList.Builder<Expression> groupbyBuilder = new ImmutableList.Builder<>();
for (UnresolvedExpression uExpr : node.getGroupExprList()) {
groupbyBuilder.add(expressionAnalyzer.analyze(uExpr, context));
}
return new LogicalAggregation(child, aggregatorBuilder.build(), groupbyBuilder.build());
/**
* Build {@link LogicalProject} or {@link LogicalRemove} from {@link Field}.
*
* <p>Todo, the include/exclude fields should change the env definition. The cons of current
* implementation is even the query contain the field reference which has been excluded from
* fields command. There is no {@link SemanticCheckException} will be thrown. Instead, the during
* runtime evaluation, the not exist filed will be resolve to {@link ExprMissingValue} which will
* not impact the correctness.
*
* <p>Postpone the implementation when finding more use case.
*/
@Override
public LogicalPlan visitProject(Project node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<ReferenceExpression> referenceExpressions =
node.getProjectList().stream()
.map(expr -> (ReferenceExpression) expressionAnalyzer.analyze(expr, context))
.collect(Collectors.toList());
Argument argument = node.getArgExprList().get(0);
Boolean exclude = (Boolean) argument.getValue().getValue();
if (exclude) {
return new LogicalRemove(child, ImmutableSet.copyOf(referenceExpressions));
} else {
return new LogicalProject(child, referenceExpressions);
}
}

/**
* Build {@link LogicalProject} or {@link LogicalRemove} from {@link Field}.
*
* <p>Todo, the include/exclude fields should change the env definition. The cons of current
* implementation is even the query contain the field reference which has been excluded from fields command. There
* is no {@link SemanticCheckException} will be thrown. Instead, the during runtime evaluation, the not exist filed
* will be resolve to {@link ExprMissingValue} which will not impact the correctness.
*
* Postpone the implementation when finding more use case.
*/
@Override
public LogicalPlan visitProject(Project node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<ReferenceExpression> referenceExpressions = node.getProjectList().stream()
.map(expr -> (ReferenceExpression) expressionAnalyzer.analyze(expr, context))
.collect(Collectors.toList());
Argument argument = node.getArgExprList().get(0);
Boolean exclude = (Boolean) argument.getValue().getValue();
if (exclude) {
return new LogicalRemove(child, ImmutableSet.copyOf(referenceExpressions));
} else {
return new LogicalProject(child, referenceExpressions);
}
/** Build {@link LogicalEval} */
@Override
public LogicalPlan visitEval(Eval node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
ImmutableList.Builder<Pair<ReferenceExpression, Expression>> expressionsBuilder =
new Builder<>();
for (Let let : node.getExpressionList()) {
ReferenceExpression ref = DSL.ref(let.getVar().getField().toString());
Expression expression = expressionAnalyzer.analyze(let.getExpression(), context);
expressionsBuilder.add(ImmutablePair.of(ref, expression));
TypeEnvironment typeEnvironment = context.peek();
// define the new reference in type env.
typeEnvironment.define(ref, expression.type(typeEnvironment));
}
return new LogicalEval(child, expressionsBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.amazon.opendistroforelasticsearch.sql.ast.expression.And;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.EqualTo;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Field;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Function;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.Literal;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedAttribute;
import com.amazon.opendistroforelasticsearch.sql.ast.expression.UnresolvedExpression;
Expand All @@ -31,71 +32,85 @@
import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.Aggregator;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionRepository;
import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionName;
import java.util.List;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;

import java.util.Collections;
import java.util.Optional;

/**
* Analyze the {@link UnresolvedExpression} in the {@link AnalysisContext} to construct the {@link Expression}
* Analyze the {@link UnresolvedExpression} in the {@link AnalysisContext} to construct the {@link
* Expression}
*/
@RequiredArgsConstructor
public class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, AnalysisContext> {
private final DSL dsl;
private final BuiltinFunctionRepository repository;
private final DSL dsl;
private final BuiltinFunctionRepository repository;

public Expression analyze(UnresolvedExpression unresolved, AnalysisContext context) {
return unresolved.accept(this, context);
}
public Expression analyze(UnresolvedExpression unresolved, AnalysisContext context) {
return unresolved.accept(this, context);
}

@Override
public Expression visitUnresolvedAttribute(UnresolvedAttribute node, AnalysisContext context) {
TypeEnvironment typeEnv = context.peek();
ReferenceExpression ref = DSL.ref(node.getAttr());
typeEnv.resolve(ref);
return ref;
}
@Override
public Expression visitUnresolvedAttribute(UnresolvedAttribute node, AnalysisContext context) {
TypeEnvironment typeEnv = context.peek();
ReferenceExpression ref = DSL.ref(node.getAttr());
typeEnv.resolve(ref);
return ref;
}

@Override
public Expression visitEqualTo(EqualTo node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);
@Override
public Expression visitEqualTo(EqualTo node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.equal(context.peek(), left, right);
}
return dsl.equal(context.peek(), left, right);
}

@Override
public Expression visitLiteral(Literal node, AnalysisContext context) {
return DSL.literal(ExprValueUtils.fromObjectValue(node.getValue()));
}
@Override
public Expression visitLiteral(Literal node, AnalysisContext context) {
return DSL.literal(ExprValueUtils.fromObjectValue(node.getValue()));
}

@Override
public Expression visitAnd(And node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);
@Override
public Expression visitAnd(And node, AnalysisContext context) {
Expression left = node.getLeft().accept(this, context);
Expression right = node.getRight().accept(this, context);

return dsl.and(context.peek(), left, right);
}
return dsl.and(context.peek(), left, right);
}

@Override
public Expression visitAggregateFunction(AggregateFunction node, AnalysisContext context) {
Optional<BuiltinFunctionName> builtinFunctionName = BuiltinFunctionName.of(node.getFuncName());
if (builtinFunctionName.isPresent()) {
Expression arg = node.getField().accept(this, context);
return (Aggregator) repository.compile(builtinFunctionName.get().getName(),
Collections.singletonList(arg),
context.peek());
} else {
throw new SemanticCheckException("Unsupported aggregation function " + node.getFuncName());
}
@Override
public Expression visitAggregateFunction(AggregateFunction node, AnalysisContext context) {
Optional<BuiltinFunctionName> builtinFunctionName = BuiltinFunctionName.of(node.getFuncName());
if (builtinFunctionName.isPresent()) {
Expression arg = node.getField().accept(this, context);
return (Aggregator)
repository.compile(
builtinFunctionName.get().getName(), Collections.singletonList(arg), context.peek());
} else {
throw new SemanticCheckException("Unsupported aggregation function " + node.getFuncName());
}
}

@Override
public Expression visitField(Field node, AnalysisContext context) {
String attr = node.getField().toString();
TypeEnvironment typeEnv = context.peek();
ReferenceExpression ref = DSL.ref(attr);
typeEnv.resolve(ref);
return ref;
}
@Override
public Expression visitFunction(Function node, AnalysisContext context) {
FunctionName functionName = FunctionName.of(node.getFuncName());
List<Expression> arguments =
node.getFuncArgs().stream()
.map(unresolvedExpression -> analyze(unresolvedExpression, context))
.collect(Collectors.toList());
return (Expression) repository.compile(functionName, arguments, context.peek());
}

@Override
public Expression visitField(Field node, AnalysisContext context) {
String attr = node.getField().toString();
TypeEnvironment typeEnv = context.peek();
ReferenceExpression ref = DSL.ref(attr);
typeEnv.resolve(ref);
return ref;
}
}
Loading