From d9ed806fd7b089180317ed37d9395041d57310a8 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Wed, 26 Apr 2023 17:23:55 -0700 Subject: [PATCH] Support pagination in V2 engine, phase 1 (#1497) * Support pagination in V2 engine, phase 1 (#226) * Fixing integration tests broken during POC Signed-off-by: MaxKsyunz * Comment to clarify an exception. Signed-off-by: MaxKsyunz * Add support for paginated scroll request, first page. Implement PaginatedPlanCache.convertToPlan for second page to work. Signed-off-by: MaxKsyunz * Progress on paginated scroll request, subsequent page. Signed-off-by: MaxKsyunz * Move `ExpressionSerializer` from `opensearch` to `core`. Signed-off-by: Yury-Fridlyand * Rename `Cursor` `asString` to `toString`. Signed-off-by: Yury-Fridlyand * Disable scroll cleaning. Signed-off-by: Yury-Fridlyand * Add full cursor serialization and deserialization. Signed-off-by: Yury-Fridlyand * Misc fixes. Signed-off-by: Yury-Fridlyand * Further work on pagination. * Added push down page size from `LogicalPaginate` to `LogicalRelation`. * Improved cursor encoding and decoding. * Added cursor compression. * Fixed issuing `SearchScrollRequest`. * Fixed returning last empty page. * Minor code grooming/commenting. Signed-off-by: Yury-Fridlyand * Pagination fix for empty indices. Signed-off-by: Yury-Fridlyand * Fix error reporting on wrong cursor. Signed-off-by: Yury-Fridlyand * Minor comments and error reporting improvement. Signed-off-by: Yury-Fridlyand * Add an end-to-end integration test. Signed-off-by: Yury-Fridlyand * Add `explain` request handlers. Signed-off-by: Yury-Fridlyand * Add IT for explain. Signed-off-by: Yury-Fridlyand * Address issues flagged by checkstyle build step (#229) Signed-off-by: MaxKsyunz * Pagination, phase 1: Add unit tests for `:core` module with coverage. (#230) * Add unit tests for `:core` module with coverage. Uncovered: `toCursor`, because it is will be changed soon. Signed-off-by: Yury-Fridlyand * Pagination, phase 1: Add unit tests for SQL module with coverage. (#239) * Add unit tests for SQL module with coverage. Signed-off-by: Yury-Fridlyand * Update sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java Signed-off-by: Yury-Fridlyand Co-authored-by: GabeFernandez310 --------- Signed-off-by: Yury-Fridlyand Co-authored-by: GabeFernandez310 * Pagination, phase 1: Add unit tests for `:opensearch` module with coverage. (#233) * Add UT for `:opensearch` module with full coverage, except `toCursor`. Signed-off-by: Yury-Fridlyand * Fix checkstyle. Signed-off-by: Yury-Fridlyand --------- Signed-off-by: Yury-Fridlyand * Fix the merges. Signed-off-by: Yury-Fridlyand * Fix explain. Signed-off-by: Yury-Fridlyand * Fix scroll cleaning. Signed-off-by: Yury-Fridlyand * Store `TotalHits` and use it to report `total` in response. Signed-off-by: Yury-Fridlyand * Add missing UT for `:protocol` module. Signed-off-by: Yury-Fridlyand * Fix PPL UTs damaged in f4ea4ad8c. Signed-off-by: Yury-Fridlyand * Minor checkstyle fixes. Signed-off-by: Yury-Fridlyand * Fallback to v1 engine for pagination (#245) * Pagination fallback integration tests. Signed-off-by: MaxKsyunz * Add UT with coverage for `toCursor` serialization. Signed-off-by: Yury-Fridlyand * Fix broken tests in `legacy`. Signed-off-by: Yury-Fridlyand * Fix getting `total` from non-paged requests and from queries without `FROM` clause. Signed-off-by: Yury-Fridlyand * Fix scroll cleaning. Signed-off-by: Yury-Fridlyand * Fix cursor request processing. Signed-off-by: Yury-Fridlyand * Update ITs. Signed-off-by: Yury-Fridlyand * Fix (again) TotalHits feature. Signed-off-by: Yury-Fridlyand * Fix typo in prometheus config. Signed-off-by: Yury-Fridlyand * Recover commented logging. Signed-off-by: Yury-Fridlyand * Move `test_pagination_blackbox` to a separate class and add logging. Signed-off-by: Yury-Fridlyand * Address some PR feedbacks: rename some classes and revert unnecessary whitespace changed. Signed-off-by: Yury-Fridlyand * Minor commenting. Signed-off-by: Yury-Fridlyand * Address PR comments. * Add javadocs * Renames * Cleaning up some comments * Remove unused code * Speed up IT Signed-off-by: Yury-Fridlyand * Minor missing changes. Signed-off-by: Yury-Fridlyand * Integration tests for fetch_size, max_result_window, and query.size_limit (#248) Signed-off-by: MaxKsyunz * Remove `PaginatedQueryService`, extend `QueryService` to hold two planners and use them. Signed-off-by: Yury-Fridlyand * Move push down functions from request builders to a new interface. Signed-off-by: Yury-Fridlyand * Some file moves. Signed-off-by: Yury-Fridlyand * Minor clean-up according to PR review. Signed-off-by: Yury-Fridlyand --------- Signed-off-by: MaxKsyunz Signed-off-by: Yury-Fridlyand Co-authored-by: MaxKsyunz Co-authored-by: GabeFernandez310 Co-authored-by: Max Ksyunz * Make scroll timeout configurable. Signed-off-by: Yury-Fridlyand * Fix IT to set cursor keep alive parameter. Signed-off-by: Yury-Fridlyand * Remove `QueryId.None`. Signed-off-by: Yury-Fridlyand * Rename according to PR feedback. Signed-off-by: Yury-Fridlyand * Remove default implementations of `PushDownRequestBuilder`. Signed-off-by: Yury-Fridlyand * Merge paginated plan optimizer into the regular optimizer. (#1516) Merge paginated plan optimizer into the regular optimizer. --------- Signed-off-by: MaxKsyunz Co-authored-by: Yury-Fridlyand * Complete rework on serialization and deserialization. (#1498) Signed-off-by: Yury-Fridlyand * Resolve merge conflicts and fix tests. Signed-off-by: Yury-Fridlyand * Minor cleanup. Signed-off-by: Yury-Fridlyand * Minor cleanup - missing changes for the previous commit. Signed-off-by: Yury-Fridlyand * Remove paginate operator (#1528) * Remove PaginateOperator class since it is no longer used. --------- Signed-off-by: MaxKsyunz * Remove `PaginatedPlan` - move logic to `QueryPlan`. Signed-off-by: Yury-Fridlyand * Remove default implementations from `SerializablePlan`. Signed-off-by: Yury-Fridlyand * Add a doc. Signed-off-by: Yury-Fridlyand * Update design graphs. Signed-off-by: Yury-Fridlyand * More fixes for merge from upstream/main. Signed-off-by: MaxKsyunz --------- Signed-off-by: MaxKsyunz Signed-off-by: Yury-Fridlyand Co-authored-by: MaxKsyunz Co-authored-by: GabeFernandez310 Co-authored-by: Max Ksyunz --- .../sql/executor/execution/QueryPlan.java | 35 ++- .../sql/executor/execution/QueryPlanTest.java | 65 +++- docs/dev/Pagination-v2.md | 287 ++++++++++++++++++ 3 files changed, 382 insertions(+), 5 deletions(-) create mode 100644 docs/dev/Pagination-v2.md diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index af5c032d49..df9bc0c734 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -8,6 +8,9 @@ package org.opensearch.sql.executor.execution; +import java.util.Optional; +import org.apache.commons.lang3.NotImplementedException; +import org.opensearch.sql.ast.tree.Paginate; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.executor.ExecutionEngine; @@ -33,25 +36,51 @@ public class QueryPlan extends AbstractPlan { protected final ResponseListener listener; - /** constructor. */ + protected final Optional pageSize; + + /** Constructor. */ + public QueryPlan( + QueryId queryId, + UnresolvedPlan plan, + QueryService queryService, + ResponseListener listener) { + super(queryId); + this.plan = plan; + this.queryService = queryService; + this.listener = listener; + this.pageSize = Optional.empty(); + } + + /** Constructor with page size. */ public QueryPlan( QueryId queryId, UnresolvedPlan plan, + int pageSize, QueryService queryService, ResponseListener listener) { super(queryId); this.plan = plan; this.queryService = queryService; this.listener = listener; + this.pageSize = Optional.of(pageSize); } @Override public void execute() { - queryService.execute(plan, listener); + if (pageSize.isPresent()) { + queryService.execute(new Paginate(pageSize.get(), plan), listener); + } else { + queryService.execute(plan, listener); + } } @Override public void explain(ResponseListener listener) { - queryService.explain(plan, listener); + if (pageSize.isPresent()) { + listener.onFailure(new NotImplementedException( + "`explain` feature for paginated requests is not implemented yet.")); + } else { + queryService.explain(plan, listener); + } } } diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java index 834db76996..a0a98e2be7 100644 --- a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanTest.java @@ -8,21 +8,30 @@ package org.opensearch.sql.executor.execution; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import org.apache.commons.lang3.NotImplementedException; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.DefaultExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryId; import org.opensearch.sql.executor.QueryService; @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class QueryPlanTest { @Mock @@ -41,7 +50,7 @@ class QueryPlanTest { private ResponseListener queryListener; @Test - public void execute() { + public void execute_no_page_size() { QueryPlan query = new QueryPlan(queryId, plan, queryService, queryListener); query.execute(); @@ -49,10 +58,62 @@ public void execute() { } @Test - public void explain() { + public void explain_no_page_size() { QueryPlan query = new QueryPlan(queryId, plan, queryService, queryListener); query.explain(explainListener); verify(queryService, times(1)).explain(plan, explainListener); } + + @Test + public void can_execute_paginated_plan() { + var listener = new ResponseListener() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + assertNotNull(response); + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }; + var plan = new QueryPlan(QueryId.queryId(), mock(UnresolvedPlan.class), 10, + queryService, listener); + plan.execute(); + } + + @Test + // Same as previous test, but with incomplete QueryService + public void can_handle_error_while_executing_plan() { + var listener = new ResponseListener() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertNotNull(e); + } + }; + var plan = new QueryPlan(QueryId.queryId(), mock(UnresolvedPlan.class), 10, + new QueryService(null, new DefaultExecutionEngine(), null), listener); + plan.execute(); + } + + @Test + public void explain_is_not_supported_for_pagination() { + new QueryPlan(null, null, 0, null, null).explain(new ResponseListener<>() { + @Override + public void onResponse(ExecutionEngine.ExplainResponse response) { + fail(); + } + + @Override + public void onFailure(Exception e) { + assertTrue(e instanceof NotImplementedException); + } + }); + } } diff --git a/docs/dev/Pagination-v2.md b/docs/dev/Pagination-v2.md new file mode 100644 index 0000000000..6e2f3f36d8 --- /dev/null +++ b/docs/dev/Pagination-v2.md @@ -0,0 +1,287 @@ +# Pagination in v2 Engine + +Pagination allows a SQL plugin client to retrieve arbitrarily large results sets one subset at a time. + +A cursor is a SQL abstraction for pagination. A client can open a cursor, retrieve a subset of data given a cursor and close a cursor. + +Currently, SQL plugin does not provide SQL cursor syntax. However, the SQL REST endpoint can return result a page at a time. This feature is used by JDBC and ODBC drivers. + + +# Scope +Currenty, V2 engine supports pagination only for simple `SELECT * FROM ` queries without any other clauses like `WHERE` or `ORDER BY`. + +# Demo +https://user-images.githubusercontent.com/88679692/224208630-8d38d833-abf8-4035-8d15-d5fb4382deca.mp4 + +# REST API +## Initial Query Request +```json +POST /_plugins/_sql +{ + "query" : "...", + "fetch_size": N +} +``` + +Response: +```json +{ + "cursor": /* cursor_id */, + "datarows": [ + // ... + ], + "schema" : [ + // ... + ] +} +``` +`query` is a DQL statement. `fetch_size` is a positive integer, indicating number of rows to return in each page. + +If `query` is a DML statement then pagination does not apply, the `fetch_size` parameter is ignored and a cursor is not created. This is existing behaviour in v1 engine. + +The client receives an (error response](#error-response) if: +- `fetch_size` is not a positive integer, or +- evaluating `query` results in a server-side error. + +## Next Page Request +```json +POST /_plugins/_sql +{ + "cursor": "" +} +``` +Similarly to v1 engine, the response object is the same as initial response if this is not the last page. + +`cursor_id` will be different with each request. + +If this is the last page, the `cursor` property is ommitted. The cursor is closed automatically. + +The client will receive an [error response](#error-response) if executing this request results in an OpenSearch or SQL plug-in error. + +## Cursor Keep Alive Timeout +Each cursor has a keep alive timer associated with it. When the timer runs out, the cursor is closed by OpenSearch. + +This timer is reset every time a page is retrieved. + +The client will receive an [error response](#error-response) if it sends a cursor request for an expired cursor. + +## Error Response +The client will receive an error response if any of the above REST calls result in an server-side error. + +The response object has the following format: +```json +{ + "error": { + "details": , + "reason": , + "type": + }, + "status": +} +``` + +`details`, `reason`, and `type` properties are string values. The exact values will depend on the error state encountered. +`status` is an HTTP status code + +## OpenSearch Data Retrieval Strategy + +OpenSearch provides several data retrival APIs that are optimized for different use cases. + +At this time, SQL plugin uses simple search API and scroll API. + +Simple retrieval API returns at most `max_result_window` number of documents. `max_result_window` is an index setting. + +Scroll API requests returns all documents but can incur high memory costs on OpenSearch coordination node. + +Efficient implementation of pagination needs to be aware of retrival API used. Each retrieval strategy will be considered separately. + +The discussion below uses *under max_result_window* to refer to scenarios that can be implemented with simple retrieval API and *over max_result_window* for scenarios that require scroll API to implement. + +## SQL Node Load Balancing +V2 SQL engine supports *sql node load balancing* -- a cursor request can be routed to any SQL node in a cluster. This is achieved by encoding all data necessary to retrieve the next page in the `cursor_id`. + +## Design Diagrams +New code workflows are highlighted. + +### First page +```mermaid +sequenceDiagram + participant SQLService + participant QueryPlanFactory + participant CanPaginateVisitor + participant QueryService + participant Planner + participant CreatePagingTableScanBuilder + participant OpenSearchExecutionEngine + participant PlanSerializer + participant Physical Plan Tree + +SQLService->>+QueryPlanFactory: execute + critical + QueryPlanFactory->>+CanPaginateVisitor: canConvertToCursor + CanPaginateVisitor-->>-QueryPlanFactory: true + end + QueryPlanFactory->>+QueryService: execute + QueryService->>+Planner: optimize + critical + Planner->>+CreatePagingTableScanBuilder: apply + CreatePagingTableScanBuilder-->>-Planner: paged index scan + end + Planner-->>-QueryService: Logical Plan Tree + QueryService->>+OpenSearchExecutionEngine: execute + Note over OpenSearchExecutionEngine: iterate result set + critical Serialization + OpenSearchExecutionEngine->>+PlanSerializer: convertToCursor + PlanSerializer-->>-OpenSearchExecutionEngine: cursor + end + critical + OpenSearchExecutionEngine->>+Physical Plan Tree: getTotalHits + Physical Plan Tree-->>-OpenSearchExecutionEngine: total hits + end + OpenSearchExecutionEngine-->>-QueryService: execution completed + QueryService-->>-QueryPlanFactory: execution completed + QueryPlanFactory-->>-SQLService: execution completed +``` + +### Second page +```mermaid +sequenceDiagram + participant SQLService + participant QueryPlanFactory + participant QueryService + participant OpenSearchExecutionEngine + participant PlanSerializer + participant Physical Plan Tree + +SQLService->>+QueryPlanFactory: execute + QueryPlanFactory->>+QueryService: execute + critical Deserialization + QueryService->>+PlanSerializer: convertToPlan + PlanSerializer-->>-QueryService: Physical plan tree + end + Note over QueryService: Planner, Optimizer and Implementor
are skipped + QueryService->>+OpenSearchExecutionEngine: execute + Note over OpenSearchExecutionEngine: iterate result set + critical Serialization + OpenSearchExecutionEngine->>+PlanSerializer: convertToCursor + PlanSerializer-->>-OpenSearchExecutionEngine: cursor + end + critical + OpenSearchExecutionEngine->>+Physical Plan Tree: getTotalHits + Physical Plan Tree-->>-OpenSearchExecutionEngine: total hits + end + OpenSearchExecutionEngine-->>-QueryService: execution completed + QueryService-->>-QueryPlanFactory: execution completed + QueryPlanFactory-->>-SQLService: execution completed +``` +### Legacy Engine Fallback +```mermaid +sequenceDiagram + participant RestSQLQueryAction + participant Legacy Engine + participant SQLService + participant QueryPlanFactory + participant CanPaginateVisitor + +RestSQLQueryAction->>+SQLService: prepareRequest + SQLService->>+QueryPlanFactory: execute + critical V2 support check + QueryPlanFactory->>+CanPaginateVisitor: canConvertToCursor + CanPaginateVisitor-->>-QueryPlanFactory: false + QueryPlanFactory-->>-RestSQLQueryAction: UnsupportedCursorRequestException + deactivate SQLService + end + RestSQLQueryAction->>Legacy Engine: accept + Note over Legacy Engine: Processing in Legacy engine + Legacy Engine-->>RestSQLQueryAction:complete +``` + +### Serialization +```mermaid +sequenceDiagram + participant PlanSerializer + participant ProjectOperator + participant ResourceMonitorPlan + participant OpenSearchPagedIndexScan + participant OpenSearchScrollRequest + participant ContinuePageRequest + +PlanSerializer->>+ProjectOperator: getPlanForSerialization + ProjectOperator-->>-PlanSerializer: this +PlanSerializer->>+ProjectOperator: serialize + Note over ProjectOperator: dump private fields + ProjectOperator->>+ResourceMonitorPlan: getPlanForSerialization + ResourceMonitorPlan-->>-ProjectOperator: delegate + Note over ResourceMonitorPlan: ResourceMonitorPlan
is not serialized + ProjectOperator->>+OpenSearchPagedIndexScan: serialize + alt First page + OpenSearchPagedIndexScan->>+OpenSearchScrollRequest: toCursor + OpenSearchScrollRequest-->>-OpenSearchPagedIndexScan: scroll ID + else Subsequent page + OpenSearchPagedIndexScan->>+ContinuePageRequest: toCursor + ContinuePageRequest-->>-OpenSearchPagedIndexScan: scroll ID + end + Note over OpenSearchPagedIndexScan: dump private fields + OpenSearchPagedIndexScan-->>-ProjectOperator: serialized + ProjectOperator-->>-PlanSerializer: serialized +Note over PlanSerializer: Zip to reduce size +``` + +### Deserialization +```mermaid +sequenceDiagram + participant PlanSerializer + participant Deserialization Stream + participant ProjectOperator + participant OpenSearchPagedIndexScan + participant ContinuePageRequest + +Note over PlanSerializer: Unzip +PlanSerializer->>+Deserialization Stream: deserialize + Deserialization Stream->>+ProjectOperator: create new + Note over ProjectOperator: load private fields + ProjectOperator-->>Deserialization Stream: deserialize input + activate Deserialization Stream + Deserialization Stream->>+OpenSearchPagedIndexScan: create new + deactivate Deserialization Stream + OpenSearchPagedIndexScan-->>+Deserialization Stream: resolve engine + Deserialization Stream->>-OpenSearchPagedIndexScan: OpenSearchStorageEngine + Note over OpenSearchPagedIndexScan: load private fields + OpenSearchPagedIndexScan->>+ContinuePageRequest: create new + ContinuePageRequest-->>-OpenSearchPagedIndexScan: created + OpenSearchPagedIndexScan-->>-ProjectOperator: deserialized + ProjectOperator-->>-PlanSerializer: deserialized + deactivate Deserialization Stream +``` + +### Total Hits + +Total Hits is the number of rows matching the search criteria; with `select *` queries it is equal to row (doc) number in the table (index). +Example: +Paging thru `SELECT * FROM calcs` (17 rows) with `fetch_size = 5` returns: + +* Page 1: total hits = 17, result size = 5, cursor +* Page 2: total hits = 17, result size = 5, cursor +* Page 3: total hits = 17, result size = 5, cursor +* Page 4: total hits = 17, result size = 2, cursor +* Page 5: total hits = 0, result size = 0 + +Default implementation of `getTotalHits` in a Physical Plan iterate child plans down the tree and gets the maximum value or 0. + +```mermaid +sequenceDiagram + participant OpenSearchExecutionEngine + participant ProjectOperator + participant ResourceMonitorPlan + participant OpenSearchPagedIndexScan + +OpenSearchExecutionEngine->>+ProjectOperator: getTotalHits + Note over ProjectOperator: default implementation + ProjectOperator->>+ResourceMonitorPlan: getTotalHits + Note over ResourceMonitorPlan: call to delegate + ResourceMonitorPlan->>+OpenSearchPagedIndexScan: getTotalHits + Note over OpenSearchPagedIndexScan: use stored value from the search response + OpenSearchPagedIndexScan-->>-ResourceMonitorPlan: value + ResourceMonitorPlan-->>-ProjectOperator: value + ProjectOperator-->>-OpenSearchExecutionEngine: value +```