diff --git a/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java b/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java index dc81d03aa4..e1172a71d6 100644 --- a/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java +++ b/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java @@ -5,27 +5,25 @@ package org.opensearch.sql.executor; -import java.io.IOException; -import java.io.ObjectInputStream; +import com.google.common.hash.HashCode; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import lombok.Data; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.ReferenceExpression; import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; import org.opensearch.sql.opensearch.executor.Cursor; import org.opensearch.sql.planner.PaginateOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; -import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor; import org.opensearch.sql.planner.physical.ProjectOperator; import org.opensearch.sql.storage.StorageEngine; -import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.TableScanOperator; -import org.opensearch.sql.storage.read.TableScanBuilder; @RequiredArgsConstructor public class PaginatedPlanCache { @@ -39,7 +37,7 @@ public boolean canConvertToCursor(UnresolvedPlan plan) { @RequiredArgsConstructor @Data - static class SeriazationContext { + static class SerializationContext { private final PaginatedPlanCache cache; } @@ -48,74 +46,104 @@ static class SeriazationContext { */ public Cursor convertToCursor(PhysicalPlan plan) { if (plan instanceof PaginateOperator) { - var raw = CURSOR_PREFIX + plan.toCursor(); + var cursor = plan.toCursor(); + if (cursor == null || cursor.isEmpty()) { + return Cursor.None; + } + var raw = CURSOR_PREFIX + compress(cursor); return new Cursor(raw.getBytes()); } else { return Cursor.None; } } + @SneakyThrows + public static String compress(String str) { + if (str == null || str.length() == 0) { + return null; + } + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream(out); + gzip.write(str.getBytes()); + gzip.close(); + return HashCode.fromBytes(out.toByteArray()).toString(); + } + + @SneakyThrows + public static String decompress(String input) { + if (input == null || input.length() == 0) { + return null; + } + GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream( + HashCode.fromString(input).asBytes())); + return new String(gzip.readAllBytes()); + } + + /** + * Parse `NamedExpression`s from cursor. + * @param listToFill List to fill with data. + * @param cursor Cursor to parse. + * @return Remaining part of the cursor. + */ + private String parseNamedExpressions(List listToFill, String cursor) { + var serializer = new DefaultExpressionSerializer(); + while (!cursor.startsWith(")") && !cursor.startsWith("(")) { + listToFill.add((NamedExpression) + serializer.deserialize(cursor.substring(0, + Math.min(cursor.indexOf(','), cursor.indexOf(')'))))); + cursor = cursor.substring(cursor.indexOf(',') + 1); + } + return cursor; + } + /** * Converts a cursor to a physical plan tree. */ public PhysicalPlan convertToPlan(String cursor) { if (cursor.startsWith(CURSOR_PREFIX)) { try { - String expression = cursor.substring(CURSOR_PREFIX.length()); - - // TODO Parse expression and initialize variables below. - // storageEngine needs to create the TableScanOperator. + cursor = cursor.substring(CURSOR_PREFIX.length()); + cursor = decompress(cursor); // TODO Parse with ANTLR or serialize as JSON/XML - if (!expression.startsWith("(Paginate,")) { + if (!cursor.startsWith("(Paginate,")) { throw new UnsupportedOperationException("Unsupported cursor"); } - expression = expression.substring(expression.indexOf(',') + 1); - int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10); + cursor = cursor.substring(cursor.indexOf(',') + 1); + int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); - expression = expression.substring(expression.indexOf(',') + 1); - int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10); + cursor = cursor.substring(cursor.indexOf(',') + 1); + int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); - expression = expression.substring(expression.indexOf(',') + 1); - if (!expression.startsWith("(Project,")) { + cursor = cursor.substring(cursor.indexOf(',') + 1); + if (!cursor.startsWith("(Project,")) { throw new UnsupportedOperationException("Unsupported cursor"); } - expression = expression.substring(expression.indexOf(',') + 1); - if (!expression.startsWith("(namedParseExpressions,")) { + cursor = cursor.substring(cursor.indexOf(',') + 1); + if (!cursor.startsWith("(namedParseExpressions,")) { throw new UnsupportedOperationException("Unsupported cursor"); } - expression = expression.substring(expression.indexOf(',') + 1); - var serializer = new DefaultExpressionSerializer(); - // TODO parse npe - List namedParseExpressions = List.of(); - expression = expression.substring(expression.indexOf(',') + 1); + cursor = cursor.substring(cursor.indexOf(',') + 1); + List namedParseExpressions = new ArrayList<>(); + cursor = parseNamedExpressions(namedParseExpressions, cursor); + + cursor = cursor.substring(cursor.indexOf(',') + 1); List projectList = new ArrayList<>(); - if (!expression.startsWith("(projectList,")) { + if (!cursor.startsWith("(projectList,")) { throw new UnsupportedOperationException("Unsupported cursor"); } - expression = expression.substring(expression.indexOf(',') + 1); - while (expression.startsWith("(named,")) { - expression = expression.substring(expression.indexOf(',') + 1); - var name = expression.substring(0, expression.indexOf(',')); - expression = expression.substring(expression.indexOf(',') + 1); - var alias = expression.substring(0, expression.indexOf(',')); - if (alias.isEmpty()) { - alias = null; - } - expression = expression.substring(expression.indexOf(',') + 1); - projectList.add(new NamedExpression(name, - serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias)); - expression = expression.substring(expression.indexOf(',') + 1); - } + cursor = cursor.substring(cursor.indexOf(',') + 1); + cursor = parseNamedExpressions(projectList, cursor); - if (!expression.startsWith("(OpenSearchPagedIndexScan,")) { + if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) { throw new UnsupportedOperationException("Unsupported cursor"); } - expression = expression.substring(expression.indexOf(',') + 1); - var indexName = expression.substring(0, expression.indexOf(',')); - expression = expression.substring(expression.indexOf(',') + 1); - var scrollId = expression.substring(0, expression.indexOf(')')); + cursor = cursor.substring(cursor.indexOf(',') + 1); + var indexName = cursor.substring(0, cursor.indexOf(',')); + cursor = cursor.substring(cursor.indexOf(',') + 1); + var scrollId = cursor.substring(0, cursor.indexOf(')')); TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId); return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions), diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index 8bb1770d71..a02d908a0f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -126,7 +126,6 @@ public PhysicalPlan visitLimit(LogicalLimit node, C context) { return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset()); } - @Override public PhysicalPlan visitPaginate(LogicalPaginate plan, C context) { return new PaginateOperator(visitChild(plan, context), plan.getPageSize()); @@ -148,10 +147,8 @@ public PhysicalPlan visitRelation(LogicalRelation node, C context) { + "implementing and optimizing logical plan with relation involved"); } - protected PhysicalPlan visitChild(LogicalPlan node, C context) { // Logical operators visited here must have a single child return node.getChild().get(0).accept(this, context); } - } diff --git a/core/src/main/java/org/opensearch/sql/planner/PaginateOperator.java b/core/src/main/java/org/opensearch/sql/planner/PaginateOperator.java index 227d4b4602..a99beea836 100644 --- a/core/src/main/java/org/opensearch/sql/planner/PaginateOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/PaginateOperator.java @@ -82,7 +82,8 @@ public String toCursor() { String child = getChild().get(0).toCursor(); var nextPage = getPageIndex() + 1; - return createSection("Paginate", Integer.toString(nextPage), - Integer.toString(getPageSize()), child); + return child == null || child.isEmpty() + ? null : createSection("Paginate", Integer.toString(nextPage), + Integer.toString(getPageSize()), child); } } diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalRelation.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalRelation.java index a49c3d5cbe..0ece74690e 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalRelation.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalRelation.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableList; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.Setter; import lombok.ToString; import org.opensearch.sql.storage.Table; @@ -25,6 +26,10 @@ public class LogicalRelation extends LogicalPlan { @Getter private final Table table; + @Getter + @Setter + private Integer pageSize; + /** * Constructor of LogicalRelation. */ @@ -32,6 +37,7 @@ public LogicalRelation(String relationName, Table table) { super(ImmutableList.of()); this.relationName = relationName; this.table = table; + this.pageSize = null; } @Override diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java index 58a96c3efc..13bcfabe74 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/LogicalPlanOptimizer.java @@ -13,8 +13,10 @@ import java.util.List; import java.util.stream.Collectors; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter; import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort; +import org.opensearch.sql.planner.optimizer.rule.PushPageSize; import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder; import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown; import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder; @@ -73,6 +75,7 @@ public static LogicalPlanOptimizer paginationCreate() { /* * Phase 2: Transformations that rely on data source push down capability */ + new PushPageSize(), new CreatePagingTableScanBuilder(), TableScanPushDown.PUSH_DOWN_FILTER, TableScanPushDown.PUSH_DOWN_AGGREGATION, diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java index 856d8df7ea..6e54897506 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/pattern/Patterns.java @@ -16,6 +16,7 @@ import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalHighlight; import org.opensearch.sql.planner.logical.LogicalLimit; +import org.opensearch.sql.planner.logical.LogicalPaginate; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; @@ -112,6 +113,16 @@ public static Property table() { : Optional.empty()); } + /** + * Logical pagination with page size. + */ + public static Property pagination() { + return Property.optionalProperty("pagination", + plan -> plan instanceof LogicalPaginate + ? Optional.of(((LogicalPaginate) plan).getPageSize()) + : Optional.empty()); + } + /** * Logical write with table field. */ diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/CreatePagingTableScanBuilder.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java similarity index 86% rename from core/src/main/java/org/opensearch/sql/planner/optimizer/CreatePagingTableScanBuilder.java rename to core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java index bc97e373c2..6ef874b376 100644 --- a/core/src/main/java/org/opensearch/sql/planner/optimizer/CreatePagingTableScanBuilder.java +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/CreatePagingTableScanBuilder.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.planner.optimizer; +package org.opensearch.sql.planner.optimizer.rule; import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table; @@ -14,6 +14,7 @@ import lombok.experimental.Accessors; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.optimizer.Rule; import org.opensearch.sql.storage.Table; import org.opensearch.sql.storage.read.TableScanBuilder; @@ -38,7 +39,8 @@ public CreatePagingTableScanBuilder() { @Override public LogicalPlan apply(LogicalRelation plan, Captures captures) { - TableScanBuilder scanBuilder = captures.get(capture).createPagedScanBuilder(); + TableScanBuilder scanBuilder = captures.get(capture) + .createPagedScanBuilder(plan.getPageSize()); // TODO: Remove this after Prometheus refactored to new table scan builder too return (scanBuilder == null) ? plan : scanBuilder; } diff --git a/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/PushPageSize.java b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/PushPageSize.java new file mode 100644 index 0000000000..78f856535f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/optimizer/rule/PushPageSize.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.optimizer.rule; + +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.pagination; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalPaginate; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.optimizer.Rule; + +import java.util.Objects; + +public class PushPageSize + implements Rule { + /** Capture the table inside matched logical paginate operator. */ + private final Capture capture; + + /** Pattern that matches logical paginate operator. */ + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + /** + * Constructor. + */ + public PushPageSize() { + this.capture = Capture.newCapture(); + this.pattern = Pattern.typeOf(LogicalPaginate.class) + .with(pagination().capturedAs(capture)); + } + + private LogicalRelation findLogicalRelation(LogicalPlan plan) { //TODO TBD multiple relations? + for (var subplan : plan.getChild()) { + if (subplan instanceof LogicalRelation) { + return (LogicalRelation) subplan; + } + var found = findLogicalRelation(subplan); + if (found != null) { + return found; + } + } + return null; + } + + @Override + public LogicalPlan apply(LogicalPaginate plan, Captures captures) { + var relation = findLogicalRelation(plan); + if (relation != null) { + relation.setPageSize(captures.get(capture)); + } + return plan; + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java index 8b7f1cad57..c52a477a51 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlan.java @@ -52,7 +52,7 @@ public ExecutionEngine.Schema schema() { } public String toCursor() { - throw new IllegalStateException(String.format("%s needs to implement ToCursor", + throw new IllegalStateException(String.format("%s needs to implement toCursor", this.getClass())); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java index f07b43768f..c61b35e0cb 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java @@ -99,17 +99,14 @@ public ExecutionEngine.Schema schema() { @Override public String toCursor() { String child = getChild().get(0).toCursor(); + if (child == null || child.isEmpty()) { + return null; + } var serializer = new DefaultExpressionSerializer(); String projects = createSection("projectList", - projectList.stream().map(ne -> createSection("named", - ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated()) - )) - .toArray(String[]::new)); + projectList.stream().map(serializer::serialize).toArray(String[]::new)); String namedExpressions = createSection("namedParseExpressions", - namedParseExpressions.stream().map(ne -> createSection("named", - ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated()) - )) - .toArray(String[]::new)); + namedParseExpressions.stream().map(serializer::serialize).toArray(String[]::new)); return createSection("Project", namedExpressions, projects, child); } } diff --git a/core/src/main/java/org/opensearch/sql/storage/Table.java b/core/src/main/java/org/opensearch/sql/storage/Table.java index 8117e2cc30..a7f2b606ca 100644 --- a/core/src/main/java/org/opensearch/sql/storage/Table.java +++ b/core/src/main/java/org/opensearch/sql/storage/Table.java @@ -93,7 +93,7 @@ default StreamingSource asStreamingSource() { throw new UnsupportedOperationException(); } - default TableScanBuilder createPagedScanBuilder() { + default TableScanBuilder createPagedScanBuilder(int pageSize) { var error = String.format("'%s' does not support pagination", getClass().toString()); throw new UnsupportedOperationException(error); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequest.java index 4027845be6..c5b6d60af3 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequest.java @@ -56,7 +56,7 @@ default String toCursor() { /** * OpenSearch Index Name. - * Indices are seperated by ",". + * Indices are separated by ",". */ @EqualsAndHashCode class IndexName { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/ContinueScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/ContinueScrollRequest.java index 3c431a15ff..ddc76f6ba2 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/ContinueScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/ContinueScrollRequest.java @@ -18,6 +18,8 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; +import static org.opensearch.sql.opensearch.request.OpenSearchScrollRequest.DEFAULT_SCROLL_TIMEOUT; + public class ContinueScrollRequest implements OpenSearchRequest { final String initialScrollId; @@ -39,10 +41,15 @@ public OpenSearchResponse search(Function searchA Function scrollAction) { SearchResponse openSearchResponse; - openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId)); - responseScrollId = openSearchResponse.getScrollId(); + openSearchResponse = scrollAction.apply(new SearchScrollRequest(initialScrollId) + .scroll(DEFAULT_SCROLL_TIMEOUT)); - return new OpenSearchResponse(openSearchResponse, exprValueFactory); + // TODO if terminated_early - something went wrong, e.g. no scroll returned. + var response = new OpenSearchResponse(openSearchResponse, exprValueFactory); + if (!response.isEmpty()) { + responseScrollId = openSearchResponse.getScrollId(); + } // else - last empty page, we should ignore the scroll even if it is returned + return response; } @Override @@ -56,4 +63,8 @@ public SearchSourceBuilder getSourceBuilder() { "SearchSourceBuilder is unavailable for ContinueScrollRequest"); } + @Override + public String toCursor() { + return responseScrollId; + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/InitialPageRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/InitialPageRequestBuilder.java index e36e0df529..f2e4fd08b4 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/InitialPageRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/InitialPageRequestBuilder.java @@ -40,15 +40,17 @@ public class InitialPageRequestBuilder implements PagedRequestBuilder { * @param settings other settings * @param exprValueFactory value factory */ - public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, Settings settings, + public InitialPageRequestBuilder(OpenSearchRequest.IndexName indexName, + int pageSize, + Settings settings, OpenSearchExprValueFactory exprValueFactory) { this.indexName = indexName; this.sourceBuilder = new SearchSourceBuilder(); this.exprValueFactory = exprValueFactory; - this.querySize = settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT);//TODO fetch_size - sourceBuilder.from(0); - sourceBuilder.size(querySize); - sourceBuilder.timeout(DEFAULT_QUERY_TIMEOUT); + this.querySize = pageSize; + sourceBuilder.from(0) + .size(querySize) + .timeout(DEFAULT_QUERY_TIMEOUT); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index b109fbd836..1366dc374b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -128,8 +128,8 @@ public TableScanBuilder createScanBuilder() { } @Override - public TableScanBuilder createPagedScanBuilder() { - var requestBuilder = new InitialPageRequestBuilder(indexName, + public TableScanBuilder createPagedScanBuilder(int pageSize) { + var requestBuilder = new InitialPageRequestBuilder(indexName, pageSize, settings, new OpenSearchExprValueFactory(getFieldTypes())); var indexScan = new OpenSearchPagedIndexScan(client, requestBuilder); return new OpenSearchPagedScanBuilder(indexScan); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchPagedIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchPagedIndexScan.java index ea09502b0e..646b414183 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchPagedIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchPagedIndexScan.java @@ -5,7 +5,10 @@ package org.opensearch.sql.opensearch.storage; +import java.util.Collections; import java.util.Iterator; +import java.util.List; + import lombok.EqualsAndHashCode; import lombok.ToString; import org.opensearch.sql.data.model.ExprValue; @@ -53,7 +56,9 @@ public void open() { OpenSearchResponse response = client.search(request); if (!response.isEmpty()) { iterator = response.iterator(); - } // TODO else - last page is empty - + } else { + iterator = Collections.emptyIterator(); + } } @Override @@ -67,6 +72,8 @@ public void close() { public String toCursor() { // TODO this assumes exactly one index is scanned. var indexName = requestBuilder.getIndexName().getIndexNames()[0]; - return createSection("OpenSearchPagedIndexScan", indexName, request.toCursor()); + var cursor = request.toCursor(); + return cursor == null || cursor.isEmpty() + ? "" : createSection("OpenSearchPagedIndexScan", indexName, cursor); } }