Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.8] Support for pagination in v2 engine of SELECT * FROM <table> queries #1685

Merged
merged 2 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
}

Expand Down
26 changes: 24 additions & 2 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.analysis;

import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
Expand Down Expand Up @@ -42,13 +43,16 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -80,12 +84,15 @@
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalML;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalProject;
import org.opensearch.sql.planner.logical.LogicalRareTopN;
Expand Down Expand Up @@ -208,7 +215,6 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
tableFunctionImplementation.applyArguments());
}


@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
LogicalPlan child = node.getChild().get(0).accept(this, context);
Expand Down Expand Up @@ -561,6 +567,23 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
return new LogicalML(child, node.getArguments());
}

@Override
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}

@Override
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand All @@ -576,5 +599,4 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
}
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Project;
import org.opensearch.sql.ast.tree.RareTopN;
Expand Down Expand Up @@ -294,4 +297,16 @@ public T visitQuery(Query node, C context) {
public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}

public T visitFetchCursor(FetchCursor cursor, C context) {
return visitChildren(cursor, context);
}

public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent close cursor operation.
* Actually a wrapper to the AST.
*/
public class CloseCursor extends UnresolvedPlan {

/**
* An instance of {@link FetchCursor}.
*/
private UnresolvedPlan cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCloseCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.cursor = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return List.of(cursor);
}
}
32 changes: 32 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;

/**
* An unresolved plan that represents fetching the next
* batch in paginationed plan.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class FetchCursor extends UnresolvedPlan {
@Getter
final String cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitFetchCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent pagination operation.
* Actually a wrapper to the AST.
*/
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = false)
@ToString
public class Paginate extends UnresolvedPlan {
@Getter
private final int pageSize;
private UnresolvedPlan child;

public Paginate(int pageSize, UnresolvedPlan child) {
this.pageSize = pageSize;
this.child = child;
}

@Override
public List<? extends Node> getChild() {
return List.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitPaginate(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown on serialization of a PhysicalPlan tree if paging is finished.
* Processing of such exception should outcome of responding no cursor to the user.
*/
public class NoCursorException extends RuntimeException {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.exception;

/**
* This should be thrown by V2 engine to support fallback scenario.
*/
public class UnsupportedCursorRequestException extends RuntimeException {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.executor.pagination.Cursor;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
Expand Down Expand Up @@ -53,6 +54,7 @@ void execute(PhysicalPlan plan, ExecutionContext context,
class QueryResponse {
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
}

@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* Query plan which does not reflect a search query being executed.
* It contains a command or an action, for example, a DDL query.
*/
public class CommandPlan extends AbstractPlan {

/**
* The query plan ast.
*/
protected final UnresolvedPlan plan;

/**
* Query service.
*/
protected final QueryService queryService;

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** Constructor. */
public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new UnsupportedOperationException("CommandPlan does not support explain");
}
}
Loading