diff --git a/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java b/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java
index e304c132bd..b76a745a03 100644
--- a/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/executor/pagination/CanPaginateVisitor.java
@@ -48,7 +48,7 @@
* Currently, V2 engine does not support queries with:
* - aggregation (GROUP BY clause or aggregation functions like min/max)
* - in memory aggregation (window function)
- * - LIMIT/OFFSET clause(s)
+ * - OFFSET clause
* - without FROM clause
* - JOIN
* - a subquery
@@ -103,10 +103,13 @@ public Boolean visitValues(Values node, Object context) {
return Boolean.TRUE;
}
- // Queries with LIMIT/OFFSET clauses are unsupported
+ // Limit can't be pushed down in pagination, so has to be implemented by `LimitOperator`.
+ // OpenSearch rejects scroll query with `from` parameter, so offset can't be pushed down.
+ // Non-zero offset produces incomplete or even empty pages, so making it not supported.
@Override
public Boolean visitLimit(Limit node, Object context) {
- return Boolean.FALSE;
+ // TODO open a GH ticket for offset
+ return node.getOffset() == 0 && canPaginate(node, context);
}
@Override
diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java
index be1227c1da..5aa616de3b 100644
--- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java
+++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java
@@ -9,9 +9,13 @@
import static com.facebook.presto.matching.DefaultMatcher.DEFAULT_MATCHER;
import com.facebook.presto.matching.Match;
+
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
@@ -28,12 +32,12 @@
*/
public class LogicalPlanOptimizer {
- private final List> rules;
+ private final List> rules;
/**
* Create {@link LogicalPlanOptimizer} with customized rules.
*/
- public LogicalPlanOptimizer(List> rules) {
+ public LogicalPlanOptimizer(List> rules) {
this.rules = rules;
}
@@ -41,7 +45,7 @@ public LogicalPlanOptimizer(List> rules) {
* Create {@link LogicalPlanOptimizer} with pre-defined rules.
*/
public static LogicalPlanOptimizer create() {
- return new LogicalPlanOptimizer(Arrays.asList(
+ return new LogicalPlanOptimizer(Stream.of(
/*
* Phase 1: Transformations that rely on relational algebra equivalence
*/
@@ -51,47 +55,48 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
+ new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
TableScanPushDown.PUSH_DOWN_LIMIT,
- new PushDownPageSize(),
TableScanPushDown.PUSH_DOWN_HIGHLIGHT,
TableScanPushDown.PUSH_DOWN_NESTED,
TableScanPushDown.PUSH_DOWN_PROJECT,
- new CreateTableWriteBuilder()));
+ new CreateTableWriteBuilder())
+ .map(r -> (Rule)r).collect(Collectors.toList()));
}
/**
* Optimize {@link LogicalPlan}.
*/
public LogicalPlan optimize(LogicalPlan plan) {
- LogicalPlan optimized = internalOptimize(plan);
+ var node = plan;
+ for (Rule rule : rules) {
+ node = traverseAndOptimize(node, rule);
+ }
+ return node;
+ }
+
+ private LogicalPlan traverseAndOptimize(LogicalPlan plan, Rule rule) {
+ LogicalPlan optimized = internalOptimize(plan, rule);
optimized.replaceChildPlans(
- optimized.getChild().stream().map(this::optimize).collect(
- Collectors.toList()));
- return internalOptimize(optimized);
+ optimized.getChild().stream().map(p -> traverseAndOptimize(p, rule))
+ .collect(Collectors.toList()));
+ return internalOptimize(optimized, rule);
}
- private LogicalPlan internalOptimize(LogicalPlan plan) {
+ private LogicalPlan internalOptimize(LogicalPlan plan, Rule rule) {
LogicalPlan node = plan;
- boolean done = false;
- while (!done) {
- done = true;
- for (Rule rule : rules) {
- Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
- if (match.isPresent()) {
- node = rule.apply(match.value(), match.captures());
- // For new TableScanPushDown impl, pattern match doesn't necessarily cause
- // push down to happen. So reiterate all rules against the node only if the node
- // is actually replaced by any rule.
- // TODO: may need to introduce fixed point or maximum iteration limit in future
- if (node != match.value()) {
- done = false;
- }
- }
- }
+ Match match = DEFAULT_MATCHER.match(rule.pattern(), node);
+ if (match.isPresent()) {
+ node = rule.apply(match.value(), match.captures());
+
+ // For new TableScanPushDown impl, pattern match doesn't necessarily cause
+ // push down to happen. So reiterate all rules against the node only if the node
+ // is actually replaced by any rule.
+ // TODO: may need to introduce fixed point or maximum iteration limit in future
}
return node;
}
diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java
index de2b47d403..672e71fd31 100644
--- a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java
+++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/read/TableScanPushDown.java
@@ -93,7 +93,7 @@ public class TableScanPushDown implements Rule {
@SuppressWarnings("unchecked")
private TableScanPushDown(WithPattern pattern,
- BiFunction pushDownFunction) {
+ BiFunction pushDownFunction) {
this.pattern = pattern;
this.capture = ((CapturePattern) pattern.getPattern()).capture();
this.pushDownFunction = pushDownFunction;
diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java
index cd84234c4b..f670cb41df 100644
--- a/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java
+++ b/core/src/main/java/org/opensearch/sql/planner/physical/LimitOperator.java
@@ -7,12 +7,17 @@
package org.opensearch.sql.planner.physical;
import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.List;
+import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.data.model.ExprValue;
+import org.opensearch.sql.exception.NoCursorException;
+import org.opensearch.sql.planner.SerializablePlan;
/**
* The limit operator sets a window, to and block the rows out of the window
@@ -25,16 +30,22 @@
* it occurs when the original result set has a size smaller than {index + limit},
* or even not greater than the offset. The latter results in an empty output.
*/
-@RequiredArgsConstructor
@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
-public class LimitOperator extends PhysicalPlan {
- private final PhysicalPlan input;
- private final Integer limit;
- private final Integer offset;
+@AllArgsConstructor
+public class LimitOperator extends PhysicalPlan implements SerializablePlan {
+ private PhysicalPlan input;
+ private Integer limit;
+ private Integer offset;
private Integer count = 0;
+ public LimitOperator(PhysicalPlan input, Integer limit, Integer offset) {
+ this.input = input;
+ this.limit = limit;
+ this.offset = offset;
+ }
+
@Override
public void open() {
super.open();
@@ -67,4 +78,29 @@ public List getChild() {
return ImmutableList.of(input);
}
+ /** Don't use, it is for deserialization needs only. */
+ @Deprecated
+ public LimitOperator() {
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ limit = in.readInt();
+ count = in.readInt();
+ // note: offset aren't serialized and deserialized, because not supported in pagination
+ // TODO open a GH ticket and post here link
+ offset = 0;
+ input = (PhysicalPlan) in.readObject();
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ if (count == limit) {
+ // paging is finished
+ throw new NoCursorException();
+ }
+ out.writeInt(limit);
+ out.writeInt(count);
+ out.writeObject(((SerializablePlan) input).getPlanForSerialization());
+ }
}
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java
index dff5545785..616ea68b91 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java
@@ -146,7 +146,8 @@ public PhysicalPlan visitLimit(LimitOperator node, Object context) {
return new LimitOperator(
visitInput(node.getInput(), context),
node.getLimit(),
- node.getOffset());
+ node.getOffset(),
+ node.getCount());
}
@Override
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
index bec133f834..203472f262 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java
@@ -97,9 +97,8 @@ public OpenSearchRequestBuilder(int requestedTotalSize,
*/
public OpenSearchRequest build(OpenSearchRequest.IndexName indexName,
int maxResultWindow, TimeValue scrollTimeout) {
- int size = requestedTotalSize;
if (pageSize == null) {
- if (startFrom + size > maxResultWindow) {
+ if (startFrom + requestedTotalSize > maxResultWindow) {
sourceBuilder.size(maxResultWindow - startFrom);
return new OpenSearchScrollRequest(
indexName, scrollTimeout, sourceBuilder, exprValueFactory);
@@ -182,7 +181,6 @@ public void pushDownSort(List> sortBuilders) {
public void pushDownLimit(Integer limit, Integer offset) {
requestedTotalSize = limit;
startFrom = offset;
- sourceBuilder.from(offset).size(limit);
}
public void pushDownTrackedScore(boolean trackScores) {
diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java
index 3a0d06d079..c418a594a1 100644
--- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java
+++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanBuilder.java
@@ -37,6 +37,9 @@ public class OpenSearchIndexScanBuilder extends TableScanBuilder {
/** Is limit operator pushed down. */
private boolean isLimitPushedDown = false;
+ /** Is page size set. */
+ private boolean isPageSizePushedDown = false;
+
/**
* Constructor used during query execution.
*/
@@ -80,6 +83,7 @@ public boolean pushDownAggregation(LogicalAggregation aggregation) {
@Override
public boolean pushDownPageSize(LogicalPaginate paginate) {
+ isPageSizePushedDown = true;
return delegate.pushDownPageSize(paginate);
}
@@ -93,8 +97,12 @@ public boolean pushDownSort(LogicalSort sort) {
@Override
public boolean pushDownLimit(LogicalLimit limit) {
+ if (isPageSizePushedDown) {
+ return false;
+ }
// Assume limit push down happening on OpenSearchIndexScanQueryBuilder
isLimitPushedDown = true;
+ // TODO move this logic to OpenSearchRequestBuilder?
return delegate.pushDownLimit(limit);
}
diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java
index 8a365b2786..a6fe37949e 100644
--- a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java
+++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java
@@ -7,8 +7,13 @@
import java.util.Arrays;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import lombok.experimental.UtilityClass;
+import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
+import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndIndexScan;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndRelation;
import org.opensearch.sql.prometheus.planner.logical.rules.MergeFilterAndRelation;
@@ -23,10 +28,10 @@ public class PrometheusLogicalPlanOptimizerFactory {
* Create Prometheus storage specified logical plan optimizer.
*/
public static LogicalPlanOptimizer create() {
- return new LogicalPlanOptimizer(Arrays.asList(
+ return new LogicalPlanOptimizer(Stream.of(
new MergeFilterAndRelation(),
new MergeAggAndIndexScan(),
new MergeAggAndRelation()
- ));
+ ).map(r -> (Rule)r).collect(Collectors.toList()));
}
}