Skip to content

Commit

Permalink
Push down limit through eval (#2876)
Browse files Browse the repository at this point in the history
  • Loading branch information
qianheng-aws authored Aug 9, 2024
1 parent 7022a09 commit 4a735ea
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.EvalPushDown;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
Expand Down Expand Up @@ -46,6 +47,7 @@ public static LogicalPlanOptimizer create() {
*/
new MergeFilterAndFilter(),
new PushFilterUnderSort(),
EvalPushDown.PUSH_DOWN_LIMIT,
/*
* Phase 2: Transformations that rely on data source push down capability
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Optional;
import lombok.experimental.UtilityClass;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
Expand Down Expand Up @@ -63,6 +64,10 @@ public static <T extends LogicalPlan> Pattern<LogicalProject> project(Pattern<T>
return Pattern.typeOf(LogicalProject.class).with(source(pattern));
}

public static Pattern<LogicalEval> evalCapture() {
return Pattern.typeOf(LogicalEval.class).capturedAs(Capture.newCapture());
}

/** Pattern for {@link TableScanBuilder} and capture it meanwhile. */
public static Pattern<TableScanBuilder> scanBuilder() {
return Pattern.typeOf(TableScanBuilder.class).capturedAs(Capture.newCapture());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.evalCapture;
import static org.opensearch.sql.planner.optimizer.pattern.Patterns.limit;
import static org.opensearch.sql.planner.optimizer.rule.EvalPushDown.EvalPushDownBuilder.match;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.matching.pattern.CapturePattern;
import com.facebook.presto.matching.pattern.WithPattern;
import java.util.List;
import java.util.function.BiFunction;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.Rule;

/**
* Rule template for all rules related to push down logical plans under eval, so these plans can
* avoid blocking by eval and may have chances to be pushed down into table scan by rules in {@link
* org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown}.
*/
public class EvalPushDown<T extends LogicalPlan> implements Rule<T> {

// TODO: Add more rules to push down sort and project
/** Push down optimize rule for limit operator. Transform `limit -> eval` to `eval -> limit` */
public static final Rule<LogicalLimit> PUSH_DOWN_LIMIT =
match(limit(evalCapture()))
.apply(
(limit, logicalEval) -> {
List<LogicalPlan> child = logicalEval.getChild();
limit.replaceChildPlans(child);
logicalEval.replaceChildPlans(List.of(limit));
return logicalEval;
});

private final Capture<LogicalEval> capture;

@Accessors(fluent = true)
@Getter
private final Pattern<T> pattern;

private final BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction;

@SuppressWarnings("unchecked")
public EvalPushDown(
WithPattern<T> pattern, BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern<LogicalEval>) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
}

@Override
public LogicalPlan apply(T plan, Captures captures) {
LogicalEval logicalEval = captures.get(capture);
return pushDownFunction.apply(plan, logicalEval);
}

static class EvalPushDownBuilder<T extends LogicalPlan> {

private WithPattern<T> pattern;

public static <T extends LogicalPlan> EvalPushDown.EvalPushDownBuilder<T> match(
Pattern<T> pattern) {
EvalPushDown.EvalPushDownBuilder<T> builder = new EvalPushDown.EvalPushDownBuilder<>();
builder.pattern = (WithPattern<T>) pattern;
return builder;
}

public EvalPushDown<T> apply(BiFunction<T, LogicalEval, LogicalPlan> pushDownFunction) {
return new EvalPushDown<>(pattern, pushDownFunction);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.opensearch.sql.data.model.ExprValueUtils.longValue;
import static org.opensearch.sql.data.type.ExprCoreType.*;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.aggregation;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.eval;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.highlight;
import static org.opensearch.sql.planner.logical.LogicalPlanDSL.limit;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -345,6 +347,27 @@ void table_scan_builder_support_offset_push_down_can_apply_its_rule() {
assertEquals(project(tableScanBuilder), optimized);
}

/** Limit - Eval --> Eval - Limit. */
@Test
void push_limit_under_eval() {
Pair<ReferenceExpression, Expression> evalExpr =
Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING));
assertEquals(
eval(limit(tableScanBuilder, 10, 5), evalExpr),
optimize(limit(eval(relation("schema", table), evalExpr), 10, 5)));
}

/** Limit - Eval - Scan --> Eval - Scan. */
@Test
void push_limit_through_eval_into_scan() {
when(tableScanBuilder.pushDownLimit(any())).thenReturn(true);
Pair<ReferenceExpression, Expression> evalExpr =
Pair.of(DSL.ref("name1", STRING), DSL.ref("name", STRING));
assertEquals(
eval(tableScanBuilder, evalExpr),
optimize(limit(eval(relation("schema", table), evalExpr), 10, 5)));
}

private LogicalPlan optimize(LogicalPlan plan) {
final LogicalPlanOptimizer optimizer = LogicalPlanOptimizer.create();
return optimizer.optimize(plan);
Expand Down
13 changes: 13 additions & 0 deletions integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ public void testSortPushDownExplain() throws Exception {
+ "| fields age"));
}

@Test
public void testLimitPushDownExplain() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_limit_push.json");

assertJsonEquals(
expected,
explainQueryToString(
"source=opensearch-sql_test_index_account"
+ "| eval ageMinus = age - 30 "
+ "| head 5 "
+ "| fields ageMinus"));
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[ageMinus]"
},
"children": [
{
"name": "EvalOperator",
"description": {
"expressions": {
"ageMinus": "-(age, 30)"
}
},
"children": [
{
"name": "OpenSearchIndexScan",
"description": {
"request": "OpenSearchQueryRequest(indexName=opensearch-sql_test_index_account, sourceBuilder={\"from\":0,\"size\":5,\"timeout\":\"1m\"}, searchDone=false)"
},
"children": []
}
]
}
]
}
}

0 comments on commit 4a735ea

Please sign in to comment.