Skip to content

Commit

Permalink
Support pagination in V2 engine, phase 1 (opensearch-project#1497)
Browse files Browse the repository at this point in the history
* Support pagination in V2 engine, phase 1 (#226)

* Fixing integration tests broken during POC

Signed-off-by: MaxKsyunz <[email protected]>

* Comment to clarify an exception.

Signed-off-by: MaxKsyunz <[email protected]>

* Add support for paginated scroll request, first page.

Implement PaginatedPlanCache.convertToPlan for second page to work.

Signed-off-by: MaxKsyunz <[email protected]>

* Progress on paginated scroll request, subsequent page.

Signed-off-by: MaxKsyunz <[email protected]>

* Move `ExpressionSerializer` from `opensearch` to `core`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rename `Cursor` `asString` to `toString`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Disable scroll cleaning.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add full cursor serialization and deserialization.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Misc fixes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Further work on pagination.

* Added push down page size from `LogicalPaginate` to `LogicalRelation`.
* Improved cursor encoding and decoding.
* Added cursor compression.
* Fixed issuing `SearchScrollRequest`.
* Fixed returning last empty page.
* Minor code grooming/commenting.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Pagination fix for empty indices.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix error reporting on wrong cursor.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor comments and error reporting improvement.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add an end-to-end integration test.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add `explain` request handlers.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add IT for explain.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address issues flagged by checkstyle build step (#229)

Signed-off-by: MaxKsyunz <[email protected]>

* Pagination, phase 1: Add unit tests for `:core` module with coverage. (#230)

* Add unit tests for `:core` module with coverage. Uncovered: `toCursor`, because it is will be changed soon.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Pagination, phase 1: Add unit tests for SQL module with coverage. (#239)

* Add unit tests for SQL module with coverage.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Update sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java

Signed-off-by: Yury-Fridlyand <[email protected]>

Co-authored-by: GabeFernandez310 <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>
Co-authored-by: GabeFernandez310 <[email protected]>

* Pagination, phase 1: Add unit tests for `:opensearch` module with coverage. (#233)

* Add UT for `:opensearch` module with full coverage, except `toCursor`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix checkstyle.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix the merges.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix explain.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix scroll cleaning.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Store `TotalHits` and use it to report `total` in response.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add missing UT for `:protocol` module.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix PPL UTs damaged in f4ea4ad.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor checkstyle fixes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fallback to v1 engine for pagination (#245)

* Pagination fallback integration tests.

Signed-off-by: MaxKsyunz <[email protected]>

* Add UT with coverage for `toCursor` serialization.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix broken tests in `legacy`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix getting `total` from non-paged requests and from queries without `FROM` clause.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix scroll cleaning.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix cursor request processing.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Update ITs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix (again) TotalHits feature.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix typo in prometheus config.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Recover commented logging.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Move `test_pagination_blackbox` to a separate class and add logging.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address some PR feedbacks: rename some classes and revert unnecessary whitespace changed.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor commenting.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Address PR comments.

* Add javadocs
* Renames
* Cleaning up some comments
* Remove unused code
* Speed up IT

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor missing changes.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Integration tests for fetch_size, max_result_window, and query.size_limit (#248)

Signed-off-by: MaxKsyunz <[email protected]>

* Remove `PaginatedQueryService`, extend `QueryService` to hold two planners and use them.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Move push down functions from request builders to a new interface.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Some file moves.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor clean-up according to PR review.

Signed-off-by: Yury-Fridlyand <[email protected]>

---------

Signed-off-by: MaxKsyunz <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Co-authored-by: MaxKsyunz <[email protected]>
Co-authored-by: GabeFernandez310 <[email protected]>
Co-authored-by: Max Ksyunz <[email protected]>

* Make scroll timeout configurable.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Fix IT to set cursor keep alive parameter.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Remove `QueryId.None`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Rename according to PR feedback.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Remove default implementations of `PushDownRequestBuilder`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Merge paginated plan optimizer into the regular optimizer. (opensearch-project#1516)

Merge paginated plan optimizer into the regular optimizer.
---------

Signed-off-by: MaxKsyunz <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>

* Complete rework on serialization and deserialization. (opensearch-project#1498)

Signed-off-by: Yury-Fridlyand <[email protected]>

* Resolve merge conflicts and fix tests.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor cleanup.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Minor cleanup - missing changes for the previous commit.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Remove paginate operator  (opensearch-project#1528)

* Remove PaginateOperator class since it is no longer used.


---------

Signed-off-by: MaxKsyunz <[email protected]>

* Remove `PaginatedPlan` - move logic to `QueryPlan`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Remove default implementations from `SerializablePlan`.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Add a doc.

Signed-off-by: Yury-Fridlyand <[email protected]>

* Update design graphs.

Signed-off-by: Yury-Fridlyand <[email protected]>

* More fixes for merge from upstream/main.

Signed-off-by: MaxKsyunz <[email protected]>

---------

Signed-off-by: MaxKsyunz <[email protected]>
Signed-off-by: Yury-Fridlyand <[email protected]>
Co-authored-by: MaxKsyunz <[email protected]>
Co-authored-by: GabeFernandez310 <[email protected]>
Co-authored-by: Max Ksyunz <[email protected]>
  • Loading branch information
4 people authored Apr 27, 2023
1 parent 4522422 commit f40bb6d
Show file tree
Hide file tree
Showing 131 changed files with 5,072 additions and 603 deletions.
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRareTopN;
Expand Down Expand Up @@ -563,6 +565,12 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -294,4 +295,8 @@ public T visitQuery(Query node, C context) {
public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
48 changes: 48 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent pagination operation.
* Actually a wrapper to the AST.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class Paginate extends UnresolvedPlan {
@Getter
private final int pageSize;
private UnresolvedPlan child;

public Paginate(int pageSize, UnresolvedPlan child) {
this.pageSize = pageSize;
this.child = child;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPaginate(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
* Processing of such exception should outcome of responding no cursor to the user.
*/
public class NoCursorException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown by V2 engine to support fallback scenario.
*/
public class UnsupportedCursorRequestException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand Down Expand Up @@ -53,6 +54,8 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final long total;
private final Cursor cursor;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ public void execute(UnresolvedPlan plan,
}
}

/**
* Execute a physical plan without analyzing or planning anything.
*/
public void executePlan(PhysicalPlan plan,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
}

/**
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
* to get response.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* ContinuePaginatedPlan represents cursor a request.
* It returns subsequent pages to the user (2nd page and all next).
*/
public class ContinuePaginatedPlan extends AbstractPlan {

private final String cursor;
private final QueryService queryService;
private final PlanSerializer planSerializer;

private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener;


/**
* Create an abstract plan that can continue paginating a given cursor.
*/
public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService,
PlanSerializer planCache,
ResponseListener<ExecutionEngine.QueryResponse>
queryResponseListener) {
super(queryId);
this.cursor = cursor;
this.planSerializer = planCache;
this.queryService = queryService;
this.queryResponseListener = queryResponseListener;
}

@Override
public void execute() {
try {
PhysicalPlan plan = planSerializer.convertToPlan(cursor);
queryService.executePlan(plan, queryResponseListener);
} catch (Exception e) {
queryResponseListener.onFailure(e);
}
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
listener.onFailure(new UnsupportedOperationException(
"Explain of a paged query continuation is not supported. "
+ "Use `explain` for the initial query request."));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.sql.executor.execution;

import java.util.Optional;
import org.apache.commons.lang3.NotImplementedException;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
Expand All @@ -33,25 +36,51 @@ public class QueryPlan extends AbstractPlan {

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** constructor. */
protected final Optional<Integer> pageSize;

/** Constructor. */
public QueryPlan(
QueryId queryId,
UnresolvedPlan plan,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.empty();
}

/** Constructor with page size. */
public QueryPlan(
QueryId queryId,
UnresolvedPlan plan,
int pageSize,
QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
this.pageSize = Optional.of(pageSize);
}

@Override
public void execute() {
queryService.execute(plan, listener);
if (pageSize.isPresent()) {
queryService.execute(new Paginate(pageSize.get(), plan), listener);
} else {
queryService.execute(plan, listener);
}
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
queryService.explain(plan, listener);
if (pageSize.isPresent()) {
listener.onFailure(new NotImplementedException(
"`explain` feature for paginated requests is not implemented yet."));
} else {
queryService.explain(plan, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.exception.UnsupportedCursorRequestException;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.PlanSerializer;

/**
* QueryExecution Factory.
Expand All @@ -37,9 +39,10 @@ public class QueryPlanFactory
* Query Service.
*/
private final QueryService queryService;
private final PlanSerializer planSerializer;

/**
* NO_CONSUMER_RESPONSE_LISTENER should never been called. It is only used as constructor
* NO_CONSUMER_RESPONSE_LISTENER should never be called. It is only used as constructor
* parameter of {@link QueryPlan}.
*/
@VisibleForTesting
Expand All @@ -62,39 +65,62 @@ public void onFailure(Exception e) {
/**
* Create QueryExecution from Statement.
*/
public AbstractPlan create(
public AbstractPlan createContinuePaginatedPlan(
Statement statement,
Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) {
return statement.accept(this, Pair.of(queryListener, explainListener));
}

/**
* Creates a ContinuePaginatedPlan from a cursor.
*/
public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
var plan = new ContinuePaginatedPlan(queryId, cursor, queryService,
planSerializer, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
}

@Override
public AbstractPlan visitQuery(
Query node,
Pair<
Optional<ResponseListener<ExecutionEngine.QueryResponse>>,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>>>
Pair<Optional<ResponseListener<ExecutionEngine.QueryResponse>>,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>>>
context) {
Preconditions.checkArgument(
context.getLeft().isPresent(), "[BUG] query listener must be not null");

return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService, context.getLeft().get());
if (node.getFetchSize() > 0) {
if (planSerializer.canConvertToCursor(node.getPlan())) {
return new QueryPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(),
queryService,
context.getLeft().get());
} else {
// This should be picked up by the legacy engine.
throw new UnsupportedCursorRequestException();
}
} else {
return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService,
context.getLeft().get());
}
}

@Override
public AbstractPlan visitExplain(
Explain node,
Pair<
Optional<ResponseListener<ExecutionEngine.QueryResponse>>,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>>>
Pair<Optional<ResponseListener<ExecutionEngine.QueryResponse>>,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>>>
context) {
Preconditions.checkArgument(
context.getRight().isPresent(), "[BUG] explain listener must be not null");

return new ExplainPlan(
QueryId.queryId(),
create(node.getStatement(), Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
createContinuePaginatedPlan(node.getStatement(),
Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}
}
Loading

0 comments on commit f40bb6d

Please sign in to comment.