Skip to content

Commit

Permalink
Further work on pagination.
Browse files Browse the repository at this point in the history
* Added push down page size from `LogicalPaginate` to `LogicalRelation`.
* Improved cursor encoding and decoding.
* Added cursor compression.
* Fixed issuing `SearchScrollRequest`.
* Fixed returning last empty page.
* Minor code grooming/commenting.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand committed Feb 10, 2023
1 parent 7b508db commit 2c1b2f9
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 78 deletions.
124 changes: 76 additions & 48 deletions core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,25 @@

package org.opensearch.sql.executor;

import java.io.IOException;
import java.io.ObjectInputStream;
import com.google.common.hash.HashCode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.expression.NamedExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer;
import org.opensearch.sql.opensearch.executor.Cursor;
import org.opensearch.sql.planner.PaginateOperator;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.planner.physical.PhysicalPlanNodeVisitor;
import org.opensearch.sql.planner.physical.ProjectOperator;
import org.opensearch.sql.storage.StorageEngine;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.TableScanOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;

@RequiredArgsConstructor
public class PaginatedPlanCache {
Expand All @@ -39,7 +37,7 @@ public boolean canConvertToCursor(UnresolvedPlan plan) {

@RequiredArgsConstructor
@Data
static class SeriazationContext {
static class SerializationContext {
private final PaginatedPlanCache cache;
}

Expand All @@ -48,74 +46,104 @@ static class SeriazationContext {
*/
public Cursor convertToCursor(PhysicalPlan plan) {
if (plan instanceof PaginateOperator) {
var raw = CURSOR_PREFIX + plan.toCursor();
var cursor = plan.toCursor();
if (cursor == null || cursor.isEmpty()) {
return Cursor.None;
}
var raw = CURSOR_PREFIX + compress(cursor);
return new Cursor(raw.getBytes());
} else {
return Cursor.None;
}
}

@SneakyThrows
public static String compress(String str) {
if (str == null || str.length() == 0) {
return null;
}

ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(str.getBytes());
gzip.close();
return HashCode.fromBytes(out.toByteArray()).toString();
}

@SneakyThrows
public static String decompress(String input) {
if (input == null || input.length() == 0) {
return null;
}
GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(
HashCode.fromString(input).asBytes()));
return new String(gzip.readAllBytes());
}

/**
* Parse `NamedExpression`s from cursor.
* @param listToFill List to fill with data.
* @param cursor Cursor to parse.
* @return Remaining part of the cursor.
*/
private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) {
var serializer = new DefaultExpressionSerializer();
while (!cursor.startsWith(")") && !cursor.startsWith("(")) {
listToFill.add((NamedExpression)
serializer.deserialize(cursor.substring(0,
Math.min(cursor.indexOf(','), cursor.indexOf(')')))));
cursor = cursor.substring(cursor.indexOf(',') + 1);
}
return cursor;
}

/**
* Converts a cursor to a physical plan tree.
*/
public PhysicalPlan convertToPlan(String cursor) {
if (cursor.startsWith(CURSOR_PREFIX)) {
try {
String expression = cursor.substring(CURSOR_PREFIX.length());

// TODO Parse expression and initialize variables below.
// storageEngine needs to create the TableScanOperator.
cursor = cursor.substring(CURSOR_PREFIX.length());
cursor = decompress(cursor);

// TODO Parse with ANTLR or serialize as JSON/XML
if (!expression.startsWith("(Paginate,")) {
if (!cursor.startsWith("(Paginate,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
int currentPageIndex = Integer.parseInt(expression, 0, expression.indexOf(','), 10);
cursor = cursor.substring(cursor.indexOf(',') + 1);
int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

expression = expression.substring(expression.indexOf(',') + 1);
int pageSize = Integer.parseInt(expression, 0, expression.indexOf(','), 10);
cursor = cursor.substring(cursor.indexOf(',') + 1);
int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10);

expression = expression.substring(expression.indexOf(',') + 1);
if (!expression.startsWith("(Project,")) {
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(Project,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
if (!expression.startsWith("(namedParseExpressions,")) {
cursor = cursor.substring(cursor.indexOf(',') + 1);
if (!cursor.startsWith("(namedParseExpressions,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
var serializer = new DefaultExpressionSerializer();
// TODO parse npe
List<NamedExpression> namedParseExpressions = List.of();

expression = expression.substring(expression.indexOf(',') + 1);
cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> namedParseExpressions = new ArrayList<>();
cursor = parseNamedExpressions(namedParseExpressions, cursor);

cursor = cursor.substring(cursor.indexOf(',') + 1);
List<NamedExpression> projectList = new ArrayList<>();
if (!expression.startsWith("(projectList,")) {
if (!cursor.startsWith("(projectList,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
while (expression.startsWith("(named,")) {
expression = expression.substring(expression.indexOf(',') + 1);
var name = expression.substring(0, expression.indexOf(','));
expression = expression.substring(expression.indexOf(',') + 1);
var alias = expression.substring(0, expression.indexOf(','));
if (alias.isEmpty()) {
alias = null;
}
expression = expression.substring(expression.indexOf(',') + 1);
projectList.add(new NamedExpression(name,
serializer.deserialize(expression.substring(0, expression.indexOf(')'))), alias));
expression = expression.substring(expression.indexOf(',') + 1);
}
cursor = cursor.substring(cursor.indexOf(',') + 1);
cursor = parseNamedExpressions(projectList, cursor);

if (!expression.startsWith("(OpenSearchPagedIndexScan,")) {
if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) {
throw new UnsupportedOperationException("Unsupported cursor");
}
expression = expression.substring(expression.indexOf(',') + 1);
var indexName = expression.substring(0, expression.indexOf(','));
expression = expression.substring(expression.indexOf(',') + 1);
var scrollId = expression.substring(0, expression.indexOf(')'));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var indexName = cursor.substring(0, cursor.indexOf(','));
cursor = cursor.substring(cursor.indexOf(',') + 1);
var scrollId = cursor.substring(0, cursor.indexOf(')'));
TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId);

return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
}


@Override
public PhysicalPlan visitPaginate(LogicalPaginate plan, C context) {
return new PaginateOperator(visitChild(plan, context), plan.getPageSize());
Expand All @@ -148,10 +147,8 @@ public PhysicalPlan visitRelation(LogicalRelation node, C context) {
+ "implementing and optimizing logical plan with relation involved");
}


protected PhysicalPlan visitChild(LogicalPlan node, C context) {
// Logical operators visited here must have a single child
return node.getChild().get(0).accept(this, context);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public String toCursor() {
String child = getChild().get(0).toCursor();

var nextPage = getPageIndex() + 1;
return createSection("Paginate", Integer.toString(nextPage),
Integer.toString(getPageSize()), child);
return child == null || child.isEmpty()
? null : createSection("Paginate", Integer.toString(nextPage),
Integer.toString(getPageSize()), child);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.storage.Table;

Expand All @@ -25,13 +26,18 @@ public class LogicalRelation extends LogicalPlan {
@Getter
private final Table table;

@Getter
@Setter
private Integer pageSize;

/**
* Constructor of LogicalRelation.
*/
public LogicalRelation(String relationName, Table table) {
super(ImmutableList.of());
this.relationName = relationName;
this.table = table;
this.pageSize = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.PushPageSize;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.read.TableScanPushDown;
import org.opensearch.sql.planner.optimizer.rule.write.CreateTableWriteBuilder;
Expand Down Expand Up @@ -73,6 +75,7 @@ public static LogicalPlanOptimizer paginationCreate() {
/*
* Phase 2: Transformations that rely on data source push down capability
*/
new PushPageSize(),
new CreatePagingTableScanBuilder(),
TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRelation;
Expand Down Expand Up @@ -112,6 +113,16 @@ public static Property<LogicalPlan, Table> table() {
: Optional.empty());
}

/**
* Logical pagination with page size.
*/
public static Property<LogicalPlan, Integer> pagination() {
return Property.optionalProperty("pagination",
plan -> plan instanceof LogicalPaginate
? Optional.of(((LogicalPaginate) plan).getPageSize())
: Optional.empty());
}

/**
* Logical write with table field.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer;
package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.table;

Expand All @@ -14,6 +14,7 @@
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;
import org.opensearch.sql.storage.Table;
import org.opensearch.sql.storage.read.TableScanBuilder;

Expand All @@ -38,7 +39,8 @@ public CreatePagingTableScanBuilder() {

@Override
public LogicalPlan apply(LogicalRelation plan, Captures captures) {
TableScanBuilder scanBuilder = captures.get(capture).createPagedScanBuilder();
TableScanBuilder scanBuilder = captures.get(capture)
.createPagedScanBuilder(plan.getPageSize());
// TODO: Remove this after Prometheus refactored to new table scan builder too
return (scanBuilder == null) ? plan : scanBuilder;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer.rule;

import static org.opensearch.sql.planner.optimizer.pattern.Patterns.pagination;

import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import lombok.Getter;
import lombok.experimental.Accessors;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalRelation;
import org.opensearch.sql.planner.optimizer.Rule;

import java.util.Objects;

public class PushPageSize
implements Rule<LogicalPaginate> {
/** Capture the table inside matched logical paginate operator. */
private final Capture<Integer> capture;

/** Pattern that matches logical paginate operator. */
@Accessors(fluent = true)
@Getter
private final Pattern<LogicalPaginate> pattern;

/**
* Constructor.
*/
public PushPageSize() {
this.capture = Capture.newCapture();
this.pattern = Pattern.typeOf(LogicalPaginate.class)
.with(pagination().capturedAs(capture));
}

private LogicalRelation findLogicalRelation(LogicalPlan plan) { //TODO TBD multiple relations?
for (var subplan : plan.getChild()) {
if (subplan instanceof LogicalRelation) {
return (LogicalRelation) subplan;
}
var found = findLogicalRelation(subplan);
if (found != null) {
return found;
}
}
return null;
}

@Override
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {
var relation = findLogicalRelation(plan);
if (relation != null) {
relation.setPageSize(captures.get(capture));
}
return plan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public ExecutionEngine.Schema schema() {
}

public String toCursor() {
throw new IllegalStateException(String.format("%s needs to implement ToCursor",
throw new IllegalStateException(String.format("%s needs to implement toCursor",
this.getClass()));
}

Expand Down
Loading

0 comments on commit 2c1b2f9

Please sign in to comment.