From 4a735ea9bca6313a06616a99a944b8e512baeb66 Mon Sep 17 00:00:00 2001 From: qianheng Date: Fri, 9 Aug 2024 12:08:48 +0800 Subject: [PATCH] Push down limit through eval (#2876) --- .../optimizer/LogicalPlanOptimizer.java | 2 + .../planner/optimizer/pattern/Patterns.java | 5 ++ .../planner/optimizer/rule/EvalPushDown.java | 82 +++++++++++++++++++ .../optimizer/LogicalPlanOptimizerTest.java | 23 ++++++ .../org/opensearch/sql/ppl/ExplainIT.java | 13 +++ .../ppl/explain_limit_push.json | 27 ++++++ 6 files changed, 152 insertions(+) create mode 100644 core/src/main/java/org/opensearch/sql/planner/optimizer/rule/EvalPushDown.java create mode 100644 integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index 5c115f0db8..e805b0dea5 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -12,6 +12,7 @@ import java.util.List; import java.util.stream.Collectors; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.rule.EvalPushDown; import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter; import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; @@ -46,6 +47,7 @@ public static LogicalPlanOptimizer create() { */ new MergeFilterAndFilter(), new PushFilterUnderSort(), + EvalPushDown.PUSH_DOWN_LIMIT, /* * Phase 2: Transformations that rely on data source push down capability */ diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java index ee4e9a20cc..ef2607e018 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java @@ -12,6 +12,7 @@ import java.util.Optional; import lombok.experimental.UtilityClass; import org.opensearch.sql.planner.logical.LogicalAggregation; +import org.opensearch.sql.planner.logical.LogicalEval; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalHighlight; import org.opensearch.sql.planner.logical.LogicalLimit; @@ -63,6 +64,10 @@ public static Pattern project(Pattern return Pattern.typeOf(LogicalProject.class).with(source(pattern)); } + public static Pattern evalCapture() { + return Pattern.typeOf(LogicalEval.class).capturedAs(Capture.newCapture()); + } + /** Pattern for {@link TableScanBuilder} and capture it meanwhile. */ public static Pattern scanBuilder() { return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture()); diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/EvalPushDown.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/EvalPushDown.java new file mode 100644 index 0000000000..17eaed0e8c --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/EvalPushDown.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.optimizer.rule; + +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.evalCapture; +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit; +import static org.opensearch.sql.planner.optimizer.rule.EvalPushDown.EvalPushDownBuilder.match; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import com.facebook.presto.matching.pattern.CapturePattern; +import com.facebook.presto.matching.pattern.WithPattern; +import java.util.List; +import java.util.function.BiFunction; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalEval; +import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.Rule; + +/** + * Rule template for all rules related to push down logical plans under eval, so these plans can + * avoid blocking by eval and may have chances to be pushed down into table scan by rules in {@link + * org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown}. + */ +public class EvalPushDown implements Rule { + + // TODO: Add more rules to push down sort and project + /** Push down optimize rule for limit operator. Transform `limit -> eval` to `eval -> limit` */ + public static final Rule PUSH_DOWN_LIMIT = + match(limit(evalCapture())) + .apply( + (limit, logicalEval) -> { + List child = logicalEval.getChild(); + limit.replaceChildPlans(child); + logicalEval.replaceChildPlans(List.of(limit)); + return logicalEval; + }); + + private final Capture capture; + + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + private final BiFunction pushDownFunction; + + @SuppressWarnings("unchecked") + public EvalPushDown( + WithPattern pattern, BiFunction pushDownFunction) { + this.pattern = pattern; + this.capture = ((CapturePattern) pattern.getPattern()).capture(); + this.pushDownFunction = pushDownFunction; + } + + @Override + public LogicalPlan apply(T plan, Captures captures) { + LogicalEval logicalEval = captures.get(capture); + return pushDownFunction.apply(plan, logicalEval); + } + + static class EvalPushDownBuilder { + + private WithPattern pattern; + + public static EvalPushDown.EvalPushDownBuilder match( + Pattern pattern) { + EvalPushDown.EvalPushDownBuilder builder = new EvalPushDown.EvalPushDownBuilder<>(); + builder.pattern = (WithPattern) pattern; + return builder; + } + + public EvalPushDown apply(BiFunction pushDownFunction) { + return new EvalPushDown<>(pattern, pushDownFunction); + } + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java index c25e415cfa..20996503b4 100644 --- a/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizerTest.java @@ -15,6 +15,7 @@ import static org.opensearch.sql.data.model.ExprValueUtils.longValue; import static org.opensearch.sql.data.type.ExprCoreType.*; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.aggregation; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.highlight; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit; @@ -43,6 +44,7 @@ import org.opensearch.sql.ast.tree.Sort; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.planner.logical.LogicalPaginate; @@ -345,6 +347,27 @@ void table_scan_builder_support_offset_push_down_can_apply_its_rule() { assertEquals(project(tableScanBuilder), optimized); } + /** Limit - Eval --> Eval - Limit. */ + @Test + void push_limit_under_eval() { + Pair evalExpr = + Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING)); + assertEquals( + eval(limit(tableScanBuilder, 10, 5), evalExpr), + optimize(limit(eval(relation("schema", table), evalExpr), 10, 5))); + } + + /** Limit - Eval - Scan --> Eval - Scan. */ + @Test + void push_limit_through_eval_into_scan() { + when(tableScanBuilder.pushDownLimit(any())).thenReturn(true); + Pair evalExpr = + Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING)); + assertEquals( + eval(tableScanBuilder, evalExpr), + optimize(limit(eval(relation("schema", table), evalExpr), 10, 5))); + } + private LogicalPlan optimize(LogicalPlan plan) { final LogicalPlanOptimizer optimizer = LogicalPlanOptimizer.create(); return optimizer.optimize(plan); diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java index fce975ef92..c6b21e1605 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java @@ -76,6 +76,19 @@ public void testSortPushDownExplain() throws Exception { + "| fields age")); } + @Test + public void testLimitPushDownExplain() throws Exception { + String expected = loadFromFile("expectedOutput/ppl/explain_limit_push.json"); + + assertJsonEquals( + expected, + explainQueryToString( + "source=opensearch-sql_test_index_account" + + "| eval ageMinus = age - 30 " + + "| head 5 " + + "| fields ageMinus")); + } + String loadFromFile(String filename) throws Exception { URI uri = Resources.getResource(filename).toURI(); return new String(Files.readAllBytes(Paths.get(uri))); diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json new file mode 100644 index 0000000000..51a627ea4d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_limit_push.json @@ -0,0 +1,27 @@ +{ + "root": { + "name": "ProjectOperator", + "description": { + "fields": "[ageMinus]" + }, + "children": [ + { + "name": "EvalOperator", + "description": { + "expressions": { + "ageMinus": "-(age, 30)" + } + }, + "children": [ + { + "name": "OpenSearchIndexScan", + "description": { + "request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)" + }, + "children": [] + } + ] + } + ] + } +}