Skip to content

Commit

Permalink
Add full cursor serialization and deserialization.
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Feb 8, 2023
1 parent 9db17c3 commit 1a83941
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -15,6 +16,7 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.opensearch.executor.Cursor;
import org.opensearch.sql.planner.PaginateOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
Expand Down Expand Up @@ -58,19 +60,69 @@ public Cursor convertToCursor(PhysicalPlan plan) {
*/
public PhysicalPlan convertToPlan(String cursor) {
if (cursor.startsWith(CURSOR_PREFIX)) {
String expression = cursor.substring(CURSOR_PREFIX.length());
try {
String expression = cursor.substring(CURSOR_PREFIX.length());

// TODO Parse expression and initialize variables below.
// storageEngine needs to create the TableScanOperator.
int pageSize = -1;
int currentPageIndex = -1;
List<NamedExpression> projectList = List.of();
String scanAsString = "";
TableScanOperator scan = storageEngine.getTableScan(scanAsString);
// TODO Parse expression and initialize variables below.
// storageEngine needs to create the TableScanOperator.

return new PaginateOperator(new ProjectOperator(scan, projectList, List.of()),
pageSize, currentPageIndex);
// TODO Parse with ANTLR or serialize as JSON/XML
if (!expression.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10);

expression = expression.substring(expression.indexOf(',') + 1);
int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10);

expression = expression.substring(expression.indexOf(',') + 1);
if (!expression.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
if (!expression.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
var serializer = new DefaultExpressionSerializer();
// TODO parse npe
List<NamedExpression> namedParseExpressions = List.of();

expression = expression.substring(expression.indexOf(',') + 1);
List<NamedExpression> projectList = new ArrayList<>();
if (!expression.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
while (expression.startsWith("(named,")) {
expression = expression.substring(expression.indexOf(',') + 1);
var name = expression.substring(0, expression.indexOf(','));
expression = expression.substring(expression.indexOf(',') + 1);
var alias = expression.substring(0, expression.indexOf(','));
if (alias.isEmpty()) {
alias = null;
}
expression = expression.substring(expression.indexOf(',') + 1);
projectList.add(new NamedExpression(name,
serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias));
expression = expression.substring(expression.indexOf(',') + 1);
}

if (!expression.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
var indexName = expression.substring(0, expression.indexOf(','));
expression = expression.substring(expression.indexOf(',') + 1);
var scrollId = expression.substring(0, expression.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
pageSize, currentPageIndex);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported cursor", e);
}
} else {
throw new UnsupportedOperationException("Unsupported cursor");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;

/**
* Project the fields specified in {@link ProjectOperator#projectList} from input.
Expand Down Expand Up @@ -98,9 +99,17 @@ public ExecutionEngine.Schema schema() {
@Override
public String toCursor() {
String child = getChild().get(0).toCursor();
String namedExpressions = "TODO";
// TODO serialize named expressions.
// Skipping parsedExpressions for now.
return createSection("Project", namedExpressions, child);
var serializer = new DefaultExpressionSerializer();
String projects = createSection("projectList",
projectList.stream().map(ne -> createSection("named",
ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated())
))
.toArray(String[]::new));
String namedExpressions = createSection("namedParseExpressions",
namedParseExpressions.stream().map(ne -> createSection("named",
ne.getName(), ne.getAlias() == null ? "" : ne.getAlias(), serializer.serialize(ne.getDelegated())
))
.toArray(String[]::new));
return createSection("Project", namedExpressions, projects, child);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ default Collection<FunctionResolver> getFunctions() {
return Collections.emptyList();
}

default TableScanOperator getTableScan(String scanAsString) {
default TableScanOperator getTableScan(String indexName, String scrollId) {
String error = String.format("%s.getTableScan needs to be implemented", getClass());
throw new UnsupportedOperationException(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ public Table getTable(DataSourceSchemaName dataSourceSchemaName, String name) {
}

@Override
public TableScanOperator getTableScan(String scanAsString) {
// TODO extract indexName and scrollId from scanAsString
String indexName ="";
String scrollId = "";
public TableScanOperator getTableScan(String indexName, String scrollId) {
var index = new OpenSearchIndex(client, settings, indexName);
var requestBuilder = new SubsequentPageRequestBuilder(
new OpenSearchRequest.IndexName(indexName),
Expand Down

0 comments on commit 1a83941

Please sign in to comment.