diff --git a/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java b/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java index 25c9408060..dc81d03aa4 100644 --- a/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java +++ b/core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.io.ObjectInputStream; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -15,6 +16,7 @@ import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; import org.opensearch.sql.opensearch.executor.Cursor; import org.opensearch.sql.planner.PaginateOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; @@ -58,19 +60,69 @@ public Cursor convertToCursor(PhysicalPlan plan) { */ public PhysicalPlan convertToPlan(String cursor) { if (cursor.startsWith(CURSOR_PREFIX)) { - String expression = cursor.substring(CURSOR_PREFIX.length()); + try { + String expression = cursor.substring(CURSOR_PREFIX.length()); - // TODO Parse expression and initialize variables below. - // storageEngine needs to create the TableScanOperator. - int pageSize = -1; - int currentPageIndex = -1; - List projectList = List.of(); - String scanAsString = ""; - TableScanOperator scan = storageEngine.getTableScan(scanAsString); + // TODO Parse expression and initialize variables below. + // storageEngine needs to create the TableScanOperator. - return new PaginateOperator(new ProjectOperator(scan, projectList, List.of()), - pageSize, currentPageIndex); + // TODO Parse with ANTLR or serialize as JSON/XML + if (!expression.startsWith("(Paginate,")) { + throw new UnsupportedOperationException("Unsupported cursor"); + } + expression = expression.substring(expression.indexOf(',') + 1); + int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10); + expression = expression.substring(expression.indexOf(',') + 1); + int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10); + + expression = expression.substring(expression.indexOf(',') + 1); + if (!expression.startsWith("(Project,")) { + throw new UnsupportedOperationException("Unsupported cursor"); + } + expression = expression.substring(expression.indexOf(',') + 1); + if (!expression.startsWith("(namedParseExpressions,")) { + throw new UnsupportedOperationException("Unsupported cursor"); + } + expression = expression.substring(expression.indexOf(',') + 1); + var serializer = new DefaultExpressionSerializer(); + // TODO parse npe + List namedParseExpressions = List.of(); + + expression = expression.substring(expression.indexOf(',') + 1); + List projectList = new ArrayList<>(); + if (!expression.startsWith("(projectList,")) { + throw new UnsupportedOperationException("Unsupported cursor"); + } + expression = expression.substring(expression.indexOf(',') + 1); + while (expression.startsWith("(named,")) { + expression = expression.substring(expression.indexOf(',') + 1); + var name = expression.substring(0, expression.indexOf(',')); + expression = expression.substring(expression.indexOf(',') + 1); + var alias = expression.substring(0, expression.indexOf(',')); + if (alias.isEmpty()) { + alias = null; + } + expression = expression.substring(expression.indexOf(',') + 1); + projectList.add(new NamedExpression(name, + serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias)); + expression = expression.substring(expression.indexOf(',') + 1); + } + + if (!expression.startsWith("(OpenSearchPagedIndexScan,")) { + throw new UnsupportedOperationException("Unsupported cursor"); + } + expression = expression.substring(expression.indexOf(',') + 1); + var indexName = expression.substring(0, expression.indexOf(',')); + expression = expression.substring(expression.indexOf(',') + 1); + var scrollId = expression.substring(0, expression.indexOf(')')); + TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId); + + return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions), + pageSize, currentPageIndex); + } catch (Exception e) { + throw new UnsupportedOperationException("Unsupported cursor", e); + } } else { throw new UnsupportedOperationException("Unsupported cursor"); } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java index 9a3542ad1f..f07b43768f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/ProjectOperator.java @@ -22,6 +22,7 @@ import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.parse.ParseExpression; +import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; /** * Project the fields specified in {@link ProjectOperator#projectList} from input. @@ -98,9 +99,17 @@ public ExecutionEngine.Schema schema() { @Override public String toCursor() { String child = getChild().get(0).toCursor(); - String namedExpressions = "TODO"; - // TODO serialize named expressions. - // Skipping parsedExpressions for now. - return createSection("Project", namedExpressions, child); + var serializer = new DefaultExpressionSerializer(); + String projects = createSection("projectList", + projectList.stream().map(ne -> createSection("named", + ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated()) + )) + .toArray(String[]::new)); + String namedExpressions = createSection("namedParseExpressions", + namedParseExpressions.stream().map(ne -> createSection("named", + ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated()) + )) + .toArray(String[]::new)); + return createSection("Project", namedExpressions, projects, child); } } diff --git a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java index e854477b72..18e9e92886 100644 --- a/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java +++ b/core/src/main/java/org/opensearch/sql/storage/StorageEngine.java @@ -32,7 +32,7 @@ default Collection getFunctions() { return Collections.emptyList(); } - default TableScanOperator getTableScan(String scanAsString) { + default TableScanOperator getTableScan(String indexName, String scrollId) { String error = String.format("%s.getTableScan needs to be implemented", getClass()); throw new UnsupportedOperationException(error); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java index f34ed79c54..0b0e231760 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchStorageEngine.java @@ -38,10 +38,7 @@ public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) { } @Override - public TableScanOperator getTableScan(String scanAsString) { - // TODO extract indexName and scrollId from scanAsString - String indexName =""; - String scrollId = ""; + public TableScanOperator getTableScan(String indexName, String scrollId) { var index = new OpenSearchIndex(client, settings, indexName); var requestBuilder = new SubsequentPageRequestBuilder( new OpenSearchRequest.IndexName(indexName),