From c377026ee5e56ec535bfeec93f21ef753ce851ff Mon Sep 17 00:00:00 2001 From: Harold Wang <74381974+harold-wang@users.noreply.github.com> Date: Tue, 15 Dec 2020 10:10:00 -0800 Subject: [PATCH 1/3] Enable Date type input in function Count() (#931) * Enable count(Date) Add IT Add Comparsion Test * Enable count(Date) Add IT * Add comparsion test file 916.txt * Consolidate count function to accept all field type --- .../aggregation/AggregatorFunction.java | 34 +++++++------------ .../aggregation/CountAggregatorTest.java | 22 ++++++++++++ .../resources/correctness/bugfixes/916.txt | 1 + .../correctness/queries/aggregation.txt | 1 + 4 files changed, 37 insertions(+), 21 deletions(-) create mode 100644 integ-test/src/test/resources/correctness/bugfixes/916.txt diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/AggregatorFunction.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/AggregatorFunction.java index a09a2c6833..e467c38585 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/AggregatorFunction.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/AggregatorFunction.java @@ -28,6 +28,8 @@ import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.TIME; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.TIMESTAMP; +import com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType; +import com.amazon.opendistroforelasticsearch.sql.data.type.ExprType; import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName; import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionRepository; import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionBuilder; @@ -35,7 +37,13 @@ import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionResolver; import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionSignature; import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + import lombok.experimental.UtilityClass; /** @@ -73,27 +81,11 @@ private static FunctionResolver avg() { private static FunctionResolver count() { FunctionName functionName = BuiltinFunctionName.COUNT.getName(); - return new FunctionResolver( - functionName, - new ImmutableMap.Builder() - .put(new FunctionSignature(functionName, Collections.singletonList(INTEGER)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(LONG)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(FLOAT)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(DOUBLE)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(STRING)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(STRUCT)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(ARRAY)), - arguments -> new CountAggregator(arguments, INTEGER)) - .put(new FunctionSignature(functionName, Collections.singletonList(BOOLEAN)), - arguments -> new CountAggregator(arguments, INTEGER)) - .build() - ); + FunctionResolver functionResolver = new FunctionResolver(functionName, + ExprCoreType.coreTypes().stream().collect(Collectors.toMap( + type -> new FunctionSignature(functionName, Collections.singletonList(type)), + type -> arguments -> new CountAggregator(arguments, INTEGER)))); + return functionResolver; } private static FunctionResolver sum() { diff --git a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/CountAggregatorTest.java b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/CountAggregatorTest.java index 4f42bec8fa..1190cc01df 100644 --- a/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/CountAggregatorTest.java +++ b/core/src/test/java/com/amazon/opendistroforelasticsearch/sql/expression/aggregation/CountAggregatorTest.java @@ -17,12 +17,16 @@ import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.ARRAY; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.BOOLEAN; +import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.DATE; +import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.DATETIME; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.DOUBLE; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.FLOAT; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.INTEGER; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.LONG; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.STRING; import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.STRUCT; +import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.TIMESTAMP; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -58,6 +62,24 @@ public void count_double_field_expression() { assertEquals(4, result.value()); } + @Test + public void count_date_field_expression() { + ExprValue result = aggregation(dsl.count(DSL.ref("date_value", DATE)), tuples); + assertEquals(4, result.value()); + } + + @Test + public void count_timestamp_field_expression() { + ExprValue result = aggregation(dsl.count(DSL.ref("timestamp_value", TIMESTAMP)), tuples); + assertEquals(4, result.value()); + } + + @Test + public void count_datetime_field_expression() { + ExprValue result = aggregation(dsl.count(DSL.ref("datetime_value", DATETIME)), tuples); + assertEquals(4, result.value()); + } + @Test public void count_arithmetic_expression() { ExprValue result = aggregation(dsl.count( diff --git a/integ-test/src/test/resources/correctness/bugfixes/916.txt b/integ-test/src/test/resources/correctness/bugfixes/916.txt new file mode 100644 index 0000000000..44a715f0eb --- /dev/null +++ b/integ-test/src/test/resources/correctness/bugfixes/916.txt @@ -0,0 +1 @@ +SELECT COUNT(timestamp) FROM kibana_sample_data_flights diff --git a/integ-test/src/test/resources/correctness/queries/aggregation.txt b/integ-test/src/test/resources/correctness/queries/aggregation.txt index 3a2081d9a8..e3878be86c 100644 --- a/integ-test/src/test/resources/correctness/queries/aggregation.txt +++ b/integ-test/src/test/resources/correctness/queries/aggregation.txt @@ -1,4 +1,5 @@ SELECT COUNT(AvgTicketPrice) FROM kibana_sample_data_flights +SELECT count(timestamp) from kibana_sample_data_flights SELECT AVG(AvgTicketPrice) FROM kibana_sample_data_flights SELECT SUM(AvgTicketPrice) FROM kibana_sample_data_flights SELECT MAX(AvgTicketPrice) FROM kibana_sample_data_flights From c70024929c2023c2520966ebf9f790af5c44974b Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 15 Dec 2020 15:36:40 -0800 Subject: [PATCH 2/3] Project operator pushdown (#933) * init * update * update * update doc --- .../sql/expression/ExpressionNodeVisitor.java | 2 +- docs/user/optimization/optimization.rst | 35 +++++- .../ElasticsearchLogicalIndexScan.java | 15 ++- ...sticsearchLogicalPlanOptimizerFactory.java | 8 +- .../logical/rule/OptimizationRuleUtils.java | 39 +++++++ .../logical/rule/PushProjectAndIndexScan.java | 73 ++++++++++++ .../logical/rule/PushProjectAndRelation.java | 77 +++++++++++++ .../storage/ElasticsearchIndex.java | 4 + .../storage/ElasticsearchIndexScan.java | 13 +++ .../ElasticsearchLogicOptimizerTest.java | 108 ++++++++++++++++-- .../ElasticsearchLogicalIndexScanTest.java | 35 ++++++ .../storage/ElasticsearchIndexScanTest.java | 3 +- .../storage/ElasticsearchIndexTest.java | 35 +++++- .../sql/elasticsearch/utils/Utils.java | 48 +++++++- .../sql/ppl/ExplainIT.java | 7 +- .../ppl/explain_filter_push.json | 4 +- .../expectedOutput/ppl/explain_sort_push.json | 4 +- 17 files changed, 476 insertions(+), 34 deletions(-) create mode 100644 elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndIndexScan.java create mode 100644 elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndRelation.java create mode 100644 elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScanTest.java diff --git a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/ExpressionNodeVisitor.java b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/ExpressionNodeVisitor.java index b3aec09ee2..6b593d5acc 100644 --- a/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/ExpressionNodeVisitor.java +++ b/core/src/main/java/com/amazon/opendistroforelasticsearch/sql/expression/ExpressionNodeVisitor.java @@ -62,7 +62,7 @@ public T visitLiteral(LiteralExpression node, C context) { } public T visitNamed(NamedExpression node, C context) { - return visitNode(node, context); + return node.getDelegated().accept(this, context); } public T visitReference(ReferenceExpression node, C context) { diff --git a/docs/user/optimization/optimization.rst b/docs/user/optimization/optimization.rst index 9acbe1ff8c..c4aab0e3aa 100644 --- a/docs/user/optimization/optimization.rst +++ b/docs/user/optimization/optimization.rst @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator:: { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)" + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)" }, "children": [] } @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator:: { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)" + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)" }, "children": [] } @@ -85,6 +85,31 @@ Elasticsearch Specific Optimization The Elasticsearch `Query DSL `_ and `Aggregation `_ also enabling the storage engine specific optimization. +Push Project Into Query DSL +--------------------------- +The Project list will push down to Query DSL to `filter the source `_:: + + sh$ curl -sS -H 'Content-Type: application/json' \ + ... -X POST localhost:9200/_opendistro/_sql/_explain \ + ... -d '{"query" : "SELECT age FROM accounts"}' + { + "root": { + "name": "ProjectOperator", + "description": { + "fields": "[age]" + }, + "children": [ + { + "name": "ElasticsearchIndexScan", + "description": { + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)" + }, + "children": [] + } + ] + } + } + Filter Merge Into Query DSL --------------------------- @@ -103,7 +128,7 @@ The Filter operator will merge into Elasticsearch Query DSL:: { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)" + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)" }, "children": [] } @@ -129,7 +154,7 @@ The Sort operator will merge into Elasticsearch Query DSL:: { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)" + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)" }, "children": [] } @@ -191,7 +216,7 @@ The Limit operator will merge in Elasticsearch Query DSL:: { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\"}, searchDone=false)" + "request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)" }, "children": [] } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScan.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScan.java index 72305dc30b..ade0e695a4 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScan.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScan.java @@ -20,10 +20,12 @@ import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort; import com.amazon.opendistroforelasticsearch.sql.expression.Expression; import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanNodeVisitor; import com.google.common.collect.ImmutableList; import java.util.List; +import java.util.Set; import lombok.Builder; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -54,7 +56,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan { * Projection List. */ @Setter - private List projectList; + private Set projectList; /** * Sort List. @@ -75,7 +77,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan { public ElasticsearchLogicalIndexScan( String relationName, Expression filter, - List projectList, + Set projectList, List> sortList, Integer limit, Integer offset) { super(ImmutableList.of()); @@ -95,4 +97,13 @@ public R accept(LogicalPlanNodeVisitor visitor, C context) { public boolean hasLimit() { return limit != null; } + + /** + * Test has projects or not. + * + * @return true for has projects, otherwise false. + */ + public boolean hasProjects() { + return projectList != null && !projectList.isEmpty(); + } } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalPlanOptimizerFactory.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalPlanOptimizerFactory.java index 7ad6f96ea6..b5ec7614cb 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalPlanOptimizerFactory.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalPlanOptimizerFactory.java @@ -25,6 +25,8 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexAgg; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexScan; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndRelation; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndIndexScan; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndRelation; import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.LogicalPlanOptimizer; import java.util.Arrays; import lombok.experimental.UtilityClass; @@ -48,7 +50,9 @@ public static LogicalPlanOptimizer create() { new MergeSortAndIndexAgg(), new MergeSortAndIndexScan(), new MergeLimitAndRelation(), - new - MergeLimitAndIndexScan())); + new MergeLimitAndIndexScan(), + new PushProjectAndRelation(), + new PushProjectAndIndexScan() + )); } } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/OptimizationRuleUtils.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/OptimizationRuleUtils.java index 9326df86cb..b2d3ae746e 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/OptimizationRuleUtils.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/OptimizationRuleUtils.java @@ -18,8 +18,14 @@ package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort; +import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor; +import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression; import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import lombok.experimental.UtilityClass; @UtilityClass @@ -50,4 +56,37 @@ public static boolean sortByDefaultOptionOnly(LogicalSort logicalSort) { || Sort.SortOption.DEFAULT_DESC.equals(sort.getLeft())) .reduce(true, Boolean::logicalAnd); } + + /** + * Find reference expression from expression. + * @param expressions a list of expression. + * + * @return a list of ReferenceExpression + */ + public static Set findReferenceExpressions( + List expressions) { + Set projectList = new HashSet<>(); + for (NamedExpression namedExpression : expressions) { + projectList.addAll(findReferenceExpression(namedExpression)); + } + return projectList; + } + + /** + * Find reference expression from expression. + * @param expression expression. + * + * @return a list of ReferenceExpression + */ + public static List findReferenceExpression( + NamedExpression expression) { + List results = new ArrayList<>(); + expression.accept(new ExpressionNodeVisitor() { + @Override + public Object visitReference(ReferenceExpression node, Object context) { + return results.add(node); + } + }, null); + return results; + } } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndIndexScan.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndIndexScan.java new file mode 100644 index 0000000000..5a1d4b9300 --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndIndexScan.java @@ -0,0 +1,73 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule; + +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions; +import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source; +import static com.facebook.presto.matching.Pattern.typeOf; + +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject; +import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule; +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import java.util.Set; + +/** + * Push Project list into ElasticsearchLogicalIndexScan. + */ +public class PushProjectAndIndexScan implements Rule { + + private final Capture indexScanCapture; + + private final Pattern pattern; + + private Set pushDownProjects; + + /** + * Constructor of MergeProjectAndIndexScan. + */ + public PushProjectAndIndexScan() { + this.indexScanCapture = Capture.newCapture(); + this.pattern = typeOf(LogicalProject.class).matching( + project -> { + pushDownProjects = findReferenceExpressions(project.getProjectList()); + return !pushDownProjects.isEmpty(); + }).with(source() + .matching(typeOf(ElasticsearchLogicalIndexScan.class) + .matching(indexScan -> !indexScan.hasProjects()) + .capturedAs(indexScanCapture))); + + } + + @Override + public Pattern pattern() { + return pattern; + } + + @Override + public LogicalPlan apply(LogicalProject project, + Captures captures) { + ElasticsearchLogicalIndexScan indexScan = captures.get(indexScanCapture); + indexScan.setProjectList(pushDownProjects); + return new LogicalProject(indexScan, project.getProjectList()); + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndRelation.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndRelation.java new file mode 100644 index 0000000000..f8960de02b --- /dev/null +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/rule/PushProjectAndRelation.java @@ -0,0 +1,77 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule; + +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions; +import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source; +import static com.facebook.presto.matching.Pattern.typeOf; + +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject; +import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalRelation; +import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule; +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import java.util.Set; + +/** + * Push Project list into Relation. The transformed plan is Project - IndexScan + */ +public class PushProjectAndRelation implements Rule { + + private final Capture relationCapture; + + private final Pattern pattern; + + private Set pushDownProjects; + + /** + * Constructor of MergeProjectAndRelation. + */ + public PushProjectAndRelation() { + this.relationCapture = Capture.newCapture(); + this.pattern = typeOf(LogicalProject.class) + .matching(project -> { + pushDownProjects = findReferenceExpressions(project.getProjectList()); + return !pushDownProjects.isEmpty(); + }) + .with(source().matching(typeOf(LogicalRelation.class).capturedAs(relationCapture))); + } + + @Override + public Pattern pattern() { + return pattern; + } + + @Override + public LogicalPlan apply(LogicalProject project, + Captures captures) { + LogicalRelation relation = captures.get(relationCapture); + return new LogicalProject( + ElasticsearchLogicalIndexScan + .builder() + .relationName(relation.getRelationName()) + .projectList(findReferenceExpressions(project.getProjectList())) + .build(), + project.getProjectList() + ); + } +} diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java index 093d182f9f..accfff5285 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndex.java @@ -124,6 +124,10 @@ public PhysicalPlan visitIndexScan(ElasticsearchLogicalIndexScan node, if (node.getLimit() != null) { context.pushDownLimit(node.getLimit(), node.getOffset()); } + + if (node.hasProjects()) { + context.pushDownProjects(node.getProjectList()); + } return indexScan; } diff --git a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScan.java b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScan.java index 34ff157e34..56de50e414 100644 --- a/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScan.java +++ b/elasticsearch/src/main/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScan.java @@ -27,12 +27,15 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchQueryRequest; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchRequest; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.response.ElasticsearchResponse; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.amazon.opendistroforelasticsearch.sql.storage.TableScanOperator; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.ToString; @@ -152,6 +155,16 @@ public void pushDownLimit(Integer limit, Integer offset) { sourceBuilder.from(offset).size(limit); } + /** + * Push down project list to DSL requets. + */ + public void pushDownProjects(Set projects) { + SearchSourceBuilder sourceBuilder = request.getSourceBuilder(); + final Set projectsSet = + projects.stream().map(ReferenceExpression::getAttr).collect(Collectors.toSet()); + sourceBuilder.fetchSource(projectsSet.toArray(new String[0]), new String[0]); + } + public void pushTypeMapping(Map typeMapping) { request.getExprValueFactory().setTypeMapping(typeMapping); } diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicOptimizerTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicOptimizerTest.java index 8cca66daae..6176c91c03 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicOptimizerTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicOptimizerTest.java @@ -24,6 +24,8 @@ import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.STRING; import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.indexScan; import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.indexScanAgg; +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.noProjects; +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.projects; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.aggregation; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.filter; import static com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL.limit; @@ -33,12 +35,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort; +import com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils; import com.amazon.opendistroforelasticsearch.sql.expression.DSL; import com.amazon.opendistroforelasticsearch.sql.expression.config.ExpressionConfig; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.LogicalPlanOptimizer; import com.google.common.collect.ImmutableList; -import java.util.Collections; +import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -56,7 +59,8 @@ void project_filter_merge_with_relation() { assertEquals( project( indexScan("schema", - dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1)))), + dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1))), + ImmutableSet.of(DSL.ref("intV", INTEGER))), DSL.named("i", DSL.ref("intV", INTEGER)) ), optimize( @@ -136,7 +140,7 @@ void aggregation_cant_merge_indexScan_with_project() { aggregation( ElasticsearchLogicalIndexScan.builder().relationName("schema") .filter(dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1)))) - .projectList(Collections.singletonList(DSL.named("i", DSL.ref("intV", INTEGER)))) + .projectList(ImmutableSet.of(DSL.ref("intV", INTEGER))) .build(), ImmutableList .of(DSL.named("AVG(intV)", @@ -148,7 +152,7 @@ void aggregation_cant_merge_indexScan_with_project() { ElasticsearchLogicalIndexScan.builder().relationName("schema") .filter(dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1)))) .projectList( - Collections.singletonList(DSL.named("i", DSL.ref("intV", INTEGER)))) + ImmutableSet.of(DSL.ref("intV", INTEGER))) .build(), ImmutableList .of(DSL.named("AVG(intV)", @@ -341,7 +345,7 @@ void sort_with_customized_option_should_not_merge_with_indexAgg() { void limit_merge_with_relation() { assertEquals( project( - indexScan("schema", 1, 1), + indexScan("schema", 1, 1, projects(DSL.ref("intV", INTEGER))), DSL.named("intV", DSL.ref("intV", INTEGER)) ), optimize( @@ -362,7 +366,8 @@ void limit_merge_with_index_scan() { project( indexScan("schema", dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1))), - 1, 1 + 1, 1, + projects(DSL.ref("intV", INTEGER)) ), DSL.named("intV", DSL.ref("intV", INTEGER)) ), @@ -386,7 +391,8 @@ void limit_merge_with_index_scan_sort() { indexScan("schema", dsl.equal(DSL.ref("intV", INTEGER), DSL.literal(integerValue(1))), 1, 1, - Pair.of(Sort.SortOption.DEFAULT_ASC, DSL.ref("longV", LONG)) + Utils.sort(DSL.ref("longV", LONG), Sort.SortOption.DEFAULT_ASC), + projects(DSL.ref("intV", INTEGER)) ), DSL.named("intV", DSL.ref("intV", INTEGER)) ), @@ -412,7 +418,7 @@ void aggregation_cant_merge_index_scan_with_limit() { assertEquals( project( aggregation( - indexScan("schema", 10, 0), + indexScan("schema", 10, 0, noProjects()), ImmutableList .of(DSL.named("AVG(intV)", dsl.avg(DSL.ref("intV", INTEGER)))), @@ -422,7 +428,7 @@ void aggregation_cant_merge_index_scan_with_limit() { optimize( project( aggregation( - indexScan("schema", 10, 0), + indexScan("schema", 10, 0, noProjects()), ImmutableList .of(DSL.named("AVG(intV)", dsl.avg(DSL.ref("intV", INTEGER)))), @@ -431,6 +437,90 @@ void aggregation_cant_merge_index_scan_with_limit() { DSL.named("AVG(intV)", DSL.ref("AVG(intV)", DOUBLE))))); } + @Test + void push_down_projectList_to_relation() { + assertEquals( + project( + indexScan("schema", projects(DSL.ref("intV", INTEGER))), + DSL.named("i", DSL.ref("intV", INTEGER)) + ), + optimize( + project( + relation("schema"), + DSL.named("i", DSL.ref("intV", INTEGER))) + ) + ); + } + + /** + * Project(intV, abs(intV)) -> Relation. + * -- will be optimized as + * Project(intV, abs(intV)) -> Relation(project=intV). + */ + @Test + void push_down_should_handle_duplication() { + assertEquals( + project( + indexScan("schema", projects(DSL.ref("intV", INTEGER))), + DSL.named("i", DSL.ref("intV", INTEGER)), + DSL.named("absi", dsl.abs(DSL.ref("intV", INTEGER))) + ), + optimize( + project( + relation("schema"), + DSL.named("i", DSL.ref("intV", INTEGER)), + DSL.named("absi", dsl.abs(DSL.ref("intV", INTEGER)))) + ) + ); + } + + /** + * Project(ListA) -> Project(ListB) -> Relation. + * -- will be optimized as + * Project(ListA) -> Project(ListB) -> Relation(project=ListB). + */ + @Test + void only_one_project_should_be_push() { + assertEquals( + project( + project( + indexScan("schema", + projects(DSL.ref("intV", INTEGER), DSL.ref("stringV", STRING)) + ), + DSL.named("i", DSL.ref("intV", INTEGER)), + DSL.named("s", DSL.ref("stringV", STRING)) + ), + DSL.named("i", DSL.ref("intV", INTEGER)) + ), + optimize( + project( + project( + relation("schema"), + DSL.named("i", DSL.ref("intV", INTEGER)), + DSL.named("s", DSL.ref("stringV", STRING)) + ), + DSL.named("i", DSL.ref("intV", INTEGER)) + ) + ) + ); + } + + @Test + void project_literal_no_push() { + assertEquals( + project( + relation("schema"), + DSL.named("i", DSL.literal("str")) + ), + optimize( + project( + relation("schema"), + DSL.named("i", DSL.literal("str")) + ) + ) + ); + } + private LogicalPlan optimize(LogicalPlan plan) { final LogicalPlanOptimizer optimizer = ElasticsearchLogicalPlanOptimizerFactory.create(); final LogicalPlan optimize = optimizer.optimize(plan); diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScanTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScanTest.java new file mode 100644 index 0000000000..8db3abbadb --- /dev/null +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/planner/logical/ElasticsearchLogicalIndexScanTest.java @@ -0,0 +1,35 @@ +/* + * + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical; + +import static org.junit.jupiter.api.Assertions.assertFalse; + +import com.google.common.collect.ImmutableSet; +import org.junit.jupiter.api.Test; + +class ElasticsearchLogicalIndexScanTest { + + @Test + void has_projects() { + assertFalse(ElasticsearchLogicalIndexScan.builder() + .projectList(ImmutableSet.of()).build() + .hasProjects()); + + assertFalse(ElasticsearchLogicalIndexScan.builder().build().hasProjects()); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScanTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScanTest.java index ebb41d27ed..1b49dd55e5 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScanTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexScanTest.java @@ -36,8 +36,10 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchQueryRequest; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.request.ElasticsearchRequest; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.response.ElasticsearchResponse; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.Set; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -155,7 +157,6 @@ PushDownAssertion shouldQuery(QueryBuilder expected) { indexScan.open(); return this; } - } private void mockResponse(ExprValue[]... searchHitBatches) { diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java index 6a71d1f603..8775fecd18 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/storage/ElasticsearchIndexTest.java @@ -21,6 +21,8 @@ import static com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType.STRING; import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.indexScan; import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.indexScanAgg; +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.noProjects; +import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.utils.Utils.projects; import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.literal; import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.named; import static com.amazon.opendistroforelasticsearch.sql.expression.DSL.ref; @@ -36,6 +38,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -57,7 +61,6 @@ import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator; import com.amazon.opendistroforelasticsearch.sql.expression.config.ExpressionConfig; -import com.amazon.opendistroforelasticsearch.sql.expression.function.FunctionDSL; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanDSL; import com.amazon.opendistroforelasticsearch.sql.planner.physical.AggregationOperator; @@ -66,7 +69,6 @@ import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlanDSL; import com.amazon.opendistroforelasticsearch.sql.planner.physical.ProjectOperator; -import com.amazon.opendistroforelasticsearch.sql.planner.physical.SortOperator; import com.amazon.opendistroforelasticsearch.sql.storage.Table; import com.google.common.collect.ImmutableMap; import java.util.Arrays; @@ -74,6 +76,7 @@ import java.util.Map; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -359,7 +362,7 @@ void shouldImplIndexScanWithLimit() { project( indexScan( indexName, - 1, 1 + 1, 1, noProjects() ), named)); @@ -382,7 +385,8 @@ void shouldImplIndexScanWithSortAndLimit() { indexScan( indexName, sortExpr, - 1, 1 + 1, 1, + noProjects() ), named)); @@ -413,4 +417,27 @@ void shouldNotPushDownLimitFarFromRelationButUpdateScanSize() { assertTrue(plan instanceof ProjectOperator); assertTrue(((ProjectOperator) plan).getInput() instanceof LimitOperator); } + + @Test + void shouldPushDownProjects() { + when(settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT)).thenReturn(200); + + String indexName = "test"; + ElasticsearchIndex index = new ElasticsearchIndex(client, settings, indexName); + PhysicalPlan plan = index.implement( + project( + indexScan( + indexName, projects(ref("intV", INTEGER)) + ), + named("i", ref("intV", INTEGER)))); + + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof ElasticsearchIndexScan); + + final FetchSourceContext fetchSource = + ((ElasticsearchIndexScan) ((ProjectOperator) plan).getInput()).getRequest() + .getSourceBuilder().fetchSource(); + assertThat(fetchSource.includes(), arrayContaining("intV")); + assertThat(fetchSource.excludes(), emptyArray()); + } } diff --git a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/utils/Utils.java b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/utils/Utils.java index 5cf2a1fe11..bde6c054ac 100644 --- a/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/utils/Utils.java +++ b/elasticsearch/src/test/java/com/amazon/opendistroforelasticsearch/sql/elasticsearch/utils/Utils.java @@ -23,12 +23,15 @@ import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan; import com.amazon.opendistroforelasticsearch.sql.expression.Expression; import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression; +import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.AvgAggregator; import com.amazon.opendistroforelasticsearch.sql.expression.aggregation.NamedAggregator; import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan; +import com.google.common.collect.ImmutableSet; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import lombok.experimental.UtilityClass; import org.apache.commons.lang3.tuple.Pair; @@ -67,10 +70,12 @@ public static LogicalPlan indexScan(String tableName, /** * Build ElasticsearchLogicalIndexScan. */ - public static LogicalPlan indexScan(String tableName, Integer offset, Integer limit) { + public static LogicalPlan indexScan(String tableName, Integer offset, Integer limit, + Set projectList) { return ElasticsearchLogicalIndexScan.builder().relationName(tableName) .offset(offset) .limit(limit) + .projectList(projectList) .build(); } @@ -79,11 +84,13 @@ public static LogicalPlan indexScan(String tableName, Integer offset, Integer li */ public static LogicalPlan indexScan(String tableName, Expression filter, - Integer offset, Integer limit) { + Integer offset, Integer limit, + Set projectList) { return ElasticsearchLogicalIndexScan.builder().relationName(tableName) .filter(filter) .offset(offset) .limit(limit) + .projectList(projectList) .build(); } @@ -93,12 +100,37 @@ public static LogicalPlan indexScan(String tableName, public static LogicalPlan indexScan(String tableName, Expression filter, Integer offset, Integer limit, - Pair... sorts) { + List> sorts, + Set projectList) { return ElasticsearchLogicalIndexScan.builder().relationName(tableName) .filter(filter) - .sortList(Arrays.asList(sorts)) + .sortList(sorts) .offset(offset) .limit(limit) + .projectList(projectList) + .build(); + } + + /** + * Build ElasticsearchLogicalIndexScan. + */ + public static LogicalPlan indexScan(String tableName, + Set projects) { + return ElasticsearchLogicalIndexScan.builder() + .relationName(tableName) + .projectList(projects) + .build(); + } + + /** + * Build ElasticsearchLogicalIndexScan. + */ + public static LogicalPlan indexScan(String tableName, Expression filter, + Set projects) { + return ElasticsearchLogicalIndexScan.builder() + .relationName(tableName) + .filter(filter) + .projectList(projects) .build(); } @@ -155,4 +187,12 @@ public static List> sort(Expression expr1, Sort.SortOption option2) { return Arrays.asList(Pair.of(option1, expr1), Pair.of(option2, expr2)); } + + public static Set projects(ReferenceExpression... expressions) { + return ImmutableSet.copyOf(expressions); + } + + public static Set noProjects() { + return null; + } } diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ExplainIT.java index 220a882bec..06d1c314d0 100644 --- a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/ppl/ExplainIT.java @@ -60,7 +60,8 @@ public void testFilterPushDownExplain() throws Exception { "source=elasticsearch-sql_test_index_account" + "| where age > 30 " + "| where age < 40 " - + "| where balance > 10000 ") + + "| where balance > 10000 " + + "| fields age") ); } @@ -80,12 +81,14 @@ public void testFilterAndAggPushDownExplain() throws Exception { @Test public void testSortPushDownExplain() throws Exception { String expected = loadFromFile("expectedOutput/ppl/explain_sort_push.json"); + assertJsonEquals( expected, explainQueryToString( "source=elasticsearch-sql_test_index_account" + "| sort age " - + "| where age > 30") + + "| where age > 30" + + "| fields age") ); } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json index 8e7949098e..1e5b36322b 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_filter_push.json @@ -2,13 +2,13 @@ "root": { "name": "ProjectOperator", "description": { - "fields": "[account_number, firstname, address, gender, city, lastname, balance, employer, state, age, email]" + "fields": "[age]" }, "children": [ { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName\u003delasticsearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)" + "request": "ElasticsearchQueryRequest(indexName\u003delasticsearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"bool\":{\"filter\":[{\"range\":{\"balance\":{\"from\":10000,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":null,\"to\":40,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone\u003dfalse)" }, "children": [] } diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json index b5a9827521..1d62ac9f91 100644 --- a/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_sort_push.json @@ -2,13 +2,13 @@ "root": { "name": "ProjectOperator", "description": { - "fields": "[account_number, firstname, address, gender, city, lastname, balance, employer, state, age, email]" + "fields": "[age]" }, "children": [ { "name": "ElasticsearchIndexScan", "description": { - "request": "ElasticsearchQueryRequest(indexName\u003delasticsearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)" + "request": "ElasticsearchQueryRequest(indexName\u003delasticsearch-sql_test_index_account, sourceBuilder\u003d{\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone\u003dfalse)" }, "children": [] } From ca9f578b09ef70e8b084de231974a5a99181e317 Mon Sep 17 00:00:00 2001 From: Chloe Date: Tue, 15 Dec 2020 16:19:09 -0800 Subject: [PATCH 3/3] Added metrics for SQL query requests in new engine (#905) * added metrics in sql new engine query action when errors occur during query execution * addressed comments * update * take all errors from new query engine as server errors --- .../sql/sql/MetricsIT.java | 78 +++++++++++++++++++ .../sql/legacy/plugin/RestSQLQueryAction.java | 22 ++++++ 2 files changed, 100 insertions(+) create mode 100644 integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/sql/MetricsIT.java diff --git a/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/sql/MetricsIT.java b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/sql/MetricsIT.java new file mode 100644 index 0000000000..473fe4402b --- /dev/null +++ b/integ-test/src/test/java/com/amazon/opendistroforelasticsearch/sql/sql/MetricsIT.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.sql.sql; + +import static com.amazon.opendistroforelasticsearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; + +import com.amazon.opendistroforelasticsearch.sql.legacy.SQLIntegTestCase; +import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.util.TestUtils; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.Test; + +public class MetricsIT extends SQLIntegTestCase { + + @Override + protected void init() throws Exception { + loadIndex(Index.BANK); + TestUtils.enableNewQueryEngine(client()); + } + + @Test + public void requestCount() throws IOException, InterruptedException { + int beforeQueries = requestTotal(); + executeQuery(String.format(Locale.ROOT, "select age from %s", TEST_INDEX_BANK)); + TimeUnit.SECONDS.sleep(2L); + + assertEquals(beforeQueries + 1, requestTotal()); + } + + private Request makeStatRequest() { + return new Request( + "GET", "/_opendistro/_sql/stats" + ); + } + + private int requestTotal() throws IOException { + JSONObject jsonObject = new JSONObject(executeStatRequest(makeStatRequest())); + return jsonObject.getInt(MetricName.REQ_TOTAL.getName()); + } + + private String executeStatRequest(final Request request) throws IOException { + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + + InputStream is = response.getEntity().getContent(); + StringBuilder sb = new StringBuilder(); + try (BufferedReader br = new BufferedReader(new InputStreamReader(is))) { + String line; + while ((line = br.readLine()) != null) { + sb.append(line); + } + } + return sb.toString(); + } +} diff --git a/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java index 3e9809fead..5d0fe66221 100644 --- a/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/com/amazon/opendistroforelasticsearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -18,14 +18,28 @@ import static com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.QueryResponse; import static com.amazon.opendistroforelasticsearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; +import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.elasticsearch.rest.RestStatus.OK; +import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE; + +import com.alibaba.druid.sql.parser.ParserException; import com.amazon.opendistroforelasticsearch.sql.common.antlr.SyntaxCheckException; import com.amazon.opendistroforelasticsearch.sql.common.response.ResponseListener; import com.amazon.opendistroforelasticsearch.sql.common.setting.Settings; import com.amazon.opendistroforelasticsearch.sql.elasticsearch.security.SecurityAccess; +import com.amazon.opendistroforelasticsearch.sql.exception.QueryEngineException; +import com.amazon.opendistroforelasticsearch.sql.exception.SemanticCheckException; import com.amazon.opendistroforelasticsearch.sql.executor.ExecutionEngine.ExplainResponse; +import com.amazon.opendistroforelasticsearch.sql.legacy.antlr.SqlAnalysisException; +import com.amazon.opendistroforelasticsearch.sql.legacy.exception.SQLFeatureDisabledException; +import com.amazon.opendistroforelasticsearch.sql.legacy.exception.SqlParseException; +import com.amazon.opendistroforelasticsearch.sql.legacy.executor.format.ErrorMessageFactory; +import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.MetricName; +import com.amazon.opendistroforelasticsearch.sql.legacy.metrics.Metrics; +import com.amazon.opendistroforelasticsearch.sql.legacy.rewriter.matchtoterm.VerificationException; +import com.amazon.opendistroforelasticsearch.sql.legacy.utils.LogUtils; import com.amazon.opendistroforelasticsearch.sql.planner.physical.PhysicalPlan; import com.amazon.opendistroforelasticsearch.sql.protocol.response.QueryResult; import com.amazon.opendistroforelasticsearch.sql.protocol.response.format.JdbcResponseFormatter; @@ -38,11 +52,13 @@ import com.amazon.opendistroforelasticsearch.sql.sql.domain.SQLQueryRequest; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.sql.SQLFeatureNotSupportedException; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; @@ -153,6 +169,7 @@ protected Object buildJsonObject(ExplainResponse response) { @Override public void onFailure(Exception e) { LOG.error("Error happened during explain", e); + logAndPublishMetrics(e); sendResponse(channel, INTERNAL_SERVER_ERROR, "Failed to explain the query due to error: " + e.getMessage()); } @@ -177,6 +194,7 @@ public void onResponse(QueryResponse response) { @Override public void onFailure(Exception e) { LOG.error("Error happened during query handling", e); + logAndPublishMetrics(e); sendResponse(channel, INTERNAL_SERVER_ERROR, formatter.format(e)); } }; @@ -195,4 +213,8 @@ private void sendResponse(RestChannel channel, RestStatus status, String content status, "application/json; charset=UTF-8", content)); } + private static void logAndPublishMetrics(Exception e) { + LOG.error(LogUtils.getRequestId() + " Server side error during query execution", e); + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + } }