From 195de3335f124af4b107717ca9918f6d8f309f94 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 16 Jun 2023 17:52:07 -0700 Subject: [PATCH] Code PoC MVP. Signed-off-by: Yury-Fridlyand --- .../pagination/CanPaginateVisitor.java | 9 ++- .../optimizer/LogicalPlanOptimizer.java | 57 ++++++++++--------- .../rule/read/TableScanPushDown.java | 2 +- .../sql/planner/physical/LimitOperator.java | 48 ++++++++++++++-- .../OpenSearchExecutionProtector.java | 3 +- .../request/OpenSearchRequestBuilder.java | 4 +- .../scan/OpenSearchIndexScanBuilder.java | 8 +++ ...PrometheusLogicalPlanOptimizerFactory.java | 9 ++- 8 files changed, 98 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java b/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java index e304c132bd..b76a745a03 100644 --- a/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java +++ b/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java @@ -48,7 +48,7 @@ * Currently, V2 engine does not support queries with: * - aggregation (GROUP BY clause or aggregation functions like min/max) * - in memory aggregation (window function) - * - LIMIT/OFFSET clause(s) + * - OFFSET clause * - without FROM clause * - JOIN * - a subquery @@ -103,10 +103,13 @@ public Boolean visitValues(Values node, Object context) { return Boolean.TRUE; } - // Queries with LIMIT/OFFSET clauses are unsupported + // Limit can't be pushed down in pagination, so has to be implemented by `LimitOperator`. + // OpenSearch rejects scroll query with `from` parameter, so offset can't be pushed down. + // Non-zero offset produces incomplete or even empty pages, so making it not supported. @Override public Boolean visitLimit(Limit node, Object context) { - return Boolean.FALSE; + // TODO open a GH ticket for offset + return node.getOffset() == 0 && canPaginate(node, context); } @Override diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index be1227c1da..5aa616de3b 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -9,9 +9,13 @@ import static com.facebook.presto.matching.DefaultMatcher.DEFAULT_MATCHER; import com.facebook.presto.matching.Match; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter; import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; @@ -28,12 +32,12 @@ */ public class LogicalPlanOptimizer { - private final List> rules; + private final List> rules; /** * Create {@link LogicalPlanOptimizer} with customized rules. */ - public LogicalPlanOptimizer(List> rules) { + public LogicalPlanOptimizer(List> rules) { this.rules = rules; } @@ -41,7 +45,7 @@ public LogicalPlanOptimizer(List> rules) { * Create {@link LogicalPlanOptimizer} with pre-defined rules. */ public static LogicalPlanOptimizer create() { - return new LogicalPlanOptimizer(Arrays.asList( + return new LogicalPlanOptimizer(Stream.of( /* * Phase 1: Transformations that rely on relational algebra equivalence */ @@ -51,47 +55,48 @@ public static LogicalPlanOptimizer create() { * Phase 2: Transformations that rely on data source push down capability */ new CreateTableScanBuilder(), + new PushDownPageSize(), TableScanPushDown.PUSH_DOWN_FILTER, TableScanPushDown.PUSH_DOWN_AGGREGATION, TableScanPushDown.PUSH_DOWN_SORT, TableScanPushDown.PUSH_DOWN_LIMIT, - new PushDownPageSize(), TableScanPushDown.PUSH_DOWN_HIGHLIGHT, TableScanPushDown.PUSH_DOWN_NESTED, TableScanPushDown.PUSH_DOWN_PROJECT, - new CreateTableWriteBuilder())); + new CreateTableWriteBuilder()) + .map(r -> (Rule)r).collect(Collectors.toList())); } /** * Optimize {@link LogicalPlan}. */ public LogicalPlan optimize(LogicalPlan plan) { - LogicalPlan optimized = internalOptimize(plan); + var node = plan; + for (Rule rule : rules) { + node = traverseAndOptimize(node, rule); + } + return node; + } + + private LogicalPlan traverseAndOptimize(LogicalPlan plan, Rule rule) { + LogicalPlan optimized = internalOptimize(plan, rule); optimized.replaceChildPlans( - optimized.getChild().stream().map(this::optimize).collect( - Collectors.toList())); - return internalOptimize(optimized); + optimized.getChild().stream().map(p -> traverseAndOptimize(p, rule)) + .collect(Collectors.toList())); + return internalOptimize(optimized, rule); } - private LogicalPlan internalOptimize(LogicalPlan plan) { + private LogicalPlan internalOptimize(LogicalPlan plan, Rule rule) { LogicalPlan node = plan; - boolean done = false; - while (!done) { - done = true; - for (Rule rule : rules) { - Match match = DEFAULT_MATCHER.match(rule.pattern(), node); - if (match.isPresent()) { - node = rule.apply(match.value(), match.captures()); - // For new TableScanPushDown impl, pattern match doesn't necessarily cause - // push down to happen. So reiterate all rules against the node only if the node - // is actually replaced by any rule. - // TODO: may need to introduce fixed point or maximum iteration limit in future - if (node != match.value()) { - done = false; - } - } - } + Match match = DEFAULT_MATCHER.match(rule.pattern(), node); + if (match.isPresent()) { + node = rule.apply(match.value(), match.captures()); + + // For new TableScanPushDown impl, pattern match doesn't necessarily cause + // push down to happen. So reiterate all rules against the node only if the node + // is actually replaced by any rule. + // TODO: may need to introduce fixed point or maximum iteration limit in future } return node; } diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java index de2b47d403..672e71fd31 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java @@ -93,7 +93,7 @@ public class TableScanPushDown implements Rule { @SuppressWarnings("unchecked") private TableScanPushDown(WithPattern pattern, - BiFunction pushDownFunction) { + BiFunction pushDownFunction) { this.pattern = pattern; this.capture = ((CapturePattern) pattern.getPattern()).capture(); this.pushDownFunction = pushDownFunction; diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java index cd84234c4b..f670cb41df 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java @@ -7,12 +7,17 @@ package org.opensearch.sql.planner.physical; import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.util.List; +import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.RequiredArgsConstructor; import lombok.ToString; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.exception.NoCursorException; +import org.opensearch.sql.planner.SerializablePlan; /** * The limit operator sets a window, to and block the rows out of the window @@ -25,16 +30,22 @@ * it occurs when the original result set has a size smaller than {index + limit}, * or even not greater than the offset. The latter results in an empty output.

*/ -@RequiredArgsConstructor @Getter @ToString @EqualsAndHashCode(callSuper = false) -public class LimitOperator extends PhysicalPlan { - private final PhysicalPlan input; - private final Integer limit; - private final Integer offset; +@AllArgsConstructor +public class LimitOperator extends PhysicalPlan implements SerializablePlan { + private PhysicalPlan input; + private Integer limit; + private Integer offset; private Integer count = 0; + public LimitOperator(PhysicalPlan input, Integer limit, Integer offset) { + this.input = input; + this.limit = limit; + this.offset = offset; + } + @Override public void open() { super.open(); @@ -67,4 +78,29 @@ public List getChild() { return ImmutableList.of(input); } + /** Don't use, it is for deserialization needs only. */ + @Deprecated + public LimitOperator() { + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + limit = in.readInt(); + count = in.readInt(); + // note: offset aren't serialized and deserialized, because not supported in pagination + // TODO open a GH ticket and post here link + offset = 0; + input = (PhysicalPlan) in.readObject(); + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + if (count == limit) { + // paging is finished + throw new NoCursorException(); + } + out.writeInt(limit); + out.writeInt(count); + out.writeObject(((SerializablePlan) input).getPlanForSerialization()); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index dff5545785..616ea68b91 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -146,7 +146,8 @@ public PhysicalPlan visitLimit(LimitOperator node, Object context) { return new LimitOperator( visitInput(node.getInput(), context), node.getLimit(), - node.getOffset()); + node.getOffset(), + node.getCount()); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index bec133f834..203472f262 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -97,9 +97,8 @@ public OpenSearchRequestBuilder(int requestedTotalSize, */ public OpenSearchRequest build(OpenSearchRequest.IndexName indexName, int maxResultWindow, TimeValue scrollTimeout) { - int size = requestedTotalSize; if (pageSize == null) { - if (startFrom + size > maxResultWindow) { + if (startFrom + requestedTotalSize > maxResultWindow) { sourceBuilder.size(maxResultWindow - startFrom); return new OpenSearchScrollRequest( indexName, scrollTimeout, sourceBuilder, exprValueFactory); @@ -182,7 +181,6 @@ public void pushDownSort(List> sortBuilders) { public void pushDownLimit(Integer limit, Integer offset) { requestedTotalSize = limit; startFrom = offset; - sourceBuilder.from(offset).size(limit); } public void pushDownTrackedScore(boolean trackScores) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java index 3a0d06d079..c418a594a1 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java @@ -37,6 +37,9 @@ public class OpenSearchIndexScanBuilder extends TableScanBuilder { /** Is limit operator pushed down. */ private boolean isLimitPushedDown = false; + /** Is page size set. */ + private boolean isPageSizePushedDown = false; + /** * Constructor used during query execution. */ @@ -80,6 +83,7 @@ public boolean pushDownAggregation(LogicalAggregation aggregation) { @Override public boolean pushDownPageSize(LogicalPaginate paginate) { + isPageSizePushedDown = true; return delegate.pushDownPageSize(paginate); } @@ -93,8 +97,12 @@ public boolean pushDownSort(LogicalSort sort) { @Override public boolean pushDownLimit(LogicalLimit limit) { + if (isPageSizePushedDown) { + return false; + } // Assume limit push down happening on OpenSearchIndexScanQueryBuilder isLimitPushedDown = true; + // TODO move this logic to OpenSearchRequestBuilder? return delegate.pushDownLimit(limit); } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java index 8a365b2786..a6fe37949e 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java @@ -7,8 +7,13 @@ import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import lombok.experimental.UtilityClass; +import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; +import org.opensearch.sql.planner.optimizer.Rule; import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndIndexScan; import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndRelation; import org.opensearch.sql.prometheus.planner.logical.rules.MergeFilterAndRelation; @@ -23,10 +28,10 @@ public class PrometheusLogicalPlanOptimizerFactory { * Create Prometheus storage specified logical plan optimizer. */ public static LogicalPlanOptimizer create() { - return new LogicalPlanOptimizer(Arrays.asList( + return new LogicalPlanOptimizer(Stream.of( new MergeFilterAndRelation(), new MergeAggAndIndexScan(), new MergeAggAndRelation() - )); + ).map(r -> (Rule)r).collect(Collectors.toList())); } }