Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle create index with batch FlintJob #2734

Merged
merged 8 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion release-notes/opensearch-sql.release-notes-2.15.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Compatible with OpenSearch and OpenSearch Dashboards Version 2.15.0
* Add option to use LakeFormation in S3Glue data source ([#2624](https://github.com/opensearch-project/sql/pull/2624))
* Remove direct ClusterState access in LocalClusterState ([#2717](https://github.com/opensearch-project/sql/pull/2717))

### Bug Fixes
* Handle create index with batch FlintJob ([#2734](https://github.com/opensearch-project/sql/pull/2734))

### Maintenance
* Use EMR serverless bundled iceberg JAR ([#2632](https://github.com/opensearch-project/sql/pull/2632))
* Update maintainers list ([#2663](https://github.com/opensearch-project/sql/pull/2663))
Expand All @@ -28,4 +31,4 @@ Compatible with OpenSearch and OpenSearch Dashboards Version 2.15.0
* Abstract queryId generation ([#2695](https://github.com/opensearch-project/sql/pull/2695))
* Introduce SessionConfigSupplier to abstract settings ([#2707](https://github.com/opensearch-project/sql/pull/2707))
* Add accountId to data models ([#2709](https://github.com/opensearch-project/sql/pull/2709))
* Pass down request context to data accessors ([#2715](https://github.com/opensearch-project/sql/pull/2715))
* Pass down request context to data accessors ([#2715](https://github.com/opensearch-project/sql/pull/2715))
44 changes: 38 additions & 6 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,35 @@ lexer grammar SqlBaseLexer;
public void markUnclosedComment() {
has_unclosed_bracketed_comment = true;
}

/**
* When greater than zero, it's in the middle of parsing ARRAY/MAP/STRUCT type.
*/
public int complex_type_level_counter = 0;

/**
* Increase the counter by one when hits KEYWORD 'ARRAY', 'MAP', 'STRUCT'.
*/
public void incComplexTypeLevelCounter() {
complex_type_level_counter++;
}

/**
* Decrease the counter by one when hits close tag '>' && the counter greater than zero
* which means we are in the middle of complex type parsing. Otherwise, it's a dangling
* GT token and we do nothing.
*/
public void decComplexTypeLevelCounter() {
if (complex_type_level_counter > 0) complex_type_level_counter--;
}

/**
* If the counter is zero, it's a shift right operator. It can be closing tags of an complex
* type definition, such as MAP<INT, ARRAY<INT>>.
*/
public boolean isShiftRightOperator() {
return complex_type_level_counter == 0 ? true : false;
}
}

SEMICOLON: ';';
Expand Down Expand Up @@ -100,14 +129,15 @@ ANTI: 'ANTI';
ANY: 'ANY';
ANY_VALUE: 'ANY_VALUE';
ARCHIVE: 'ARCHIVE';
ARRAY: 'ARRAY';
ARRAY: 'ARRAY' {incComplexTypeLevelCounter();};
AS: 'AS';
ASC: 'ASC';
AT: 'AT';
AUTHORIZATION: 'AUTHORIZATION';
BETWEEN: 'BETWEEN';
BIGINT: 'BIGINT';
BINARY: 'BINARY';
BINDING: 'BINDING';
BOOLEAN: 'BOOLEAN';
BOTH: 'BOTH';
BUCKET: 'BUCKET';
Expand Down Expand Up @@ -137,6 +167,7 @@ COMMENT: 'COMMENT';
COMMIT: 'COMMIT';
COMPACT: 'COMPACT';
COMPACTIONS: 'COMPACTIONS';
COMPENSATION: 'COMPENSATION';
COMPUTE: 'COMPUTE';
CONCATENATE: 'CONCATENATE';
CONSTRAINT: 'CONSTRAINT';
Expand Down Expand Up @@ -257,7 +288,7 @@ LOCKS: 'LOCKS';
LOGICAL: 'LOGICAL';
LONG: 'LONG';
MACRO: 'MACRO';
MAP: 'MAP';
MAP: 'MAP' {incComplexTypeLevelCounter();};
MATCHED: 'MATCHED';
MERGE: 'MERGE';
MICROSECOND: 'MICROSECOND';
Expand Down Expand Up @@ -298,8 +329,6 @@ OVERWRITE: 'OVERWRITE';
PARTITION: 'PARTITION';
PARTITIONED: 'PARTITIONED';
PARTITIONS: 'PARTITIONS';
PERCENTILE_CONT: 'PERCENTILE_CONT';
PERCENTILE_DISC: 'PERCENTILE_DISC';
PERCENTLIT: 'PERCENT';
PIVOT: 'PIVOT';
PLACING: 'PLACING';
Expand Down Expand Up @@ -362,7 +391,7 @@ STATISTICS: 'STATISTICS';
STORED: 'STORED';
STRATIFY: 'STRATIFY';
STRING: 'STRING';
STRUCT: 'STRUCT';
STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
SYNC: 'SYNC';
Expand Down Expand Up @@ -439,8 +468,11 @@ NEQ : '<>';
NEQJ: '!=';
LT : '<';
LTE : '<=' | '!>';
GT : '>';
GT : '>' {decComplexTypeLevelCounter();};
GTE : '>=' | '!<';
SHIFT_LEFT: '<<';
SHIFT_RIGHT: '>>' {isShiftRightOperator()}?;
SHIFT_RIGHT_UNSIGNED: '>>>' {isShiftRightOperator()}?;

PLUS: '+';
MINUS: '-';
Expand Down
77 changes: 50 additions & 27 deletions spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ statement
| USE identifierReference #use
| USE namespace identifierReference #useNamespace
| SET CATALOG (errorCapturingIdentifier | stringLit) #setCatalog
| CREATE namespace (IF NOT EXISTS)? identifierReference
| CREATE namespace (IF errorCapturingNot EXISTS)? identifierReference
(commentSpec |
locationSpec |
(WITH (DBPROPERTIES | PROPERTIES) propertyList))* #createNamespace
Expand All @@ -92,7 +92,7 @@ statement
| createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
LIKE source=tableIdentifier
(tableProvider |
rowFormat |
Expand Down Expand Up @@ -141,7 +141,7 @@ statement
SET SERDE stringLit (WITH SERDEPROPERTIES propertyList)? #setTableSerDe
| ALTER TABLE identifierReference (partitionSpec)?
SET SERDEPROPERTIES propertyList #setTableSerDe
| ALTER (TABLE | VIEW) identifierReference ADD (IF NOT EXISTS)?
| ALTER (TABLE | VIEW) identifierReference ADD (IF errorCapturingNot EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER TABLE identifierReference
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
Expand All @@ -153,17 +153,19 @@ statement
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? identifierReference
VIEW (IF errorCapturingNot EXISTS)? identifierReference
identifierCommentList?
(commentSpec |
schemaBinding |
(PARTITIONED ON identifierList) |
(TBLPROPERTIES propertyList))*
AS query #createView
| CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
tableIdentifier (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider
(OPTIONS propertyList)? #createTempViewUsing
| ALTER VIEW identifierReference AS? query #alterViewQuery
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF NOT EXISTS)?
| ALTER VIEW identifierReference schemaBinding #alterViewSchemaBinding
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF errorCapturingNot EXISTS)?
identifierReference AS className=stringLit
(USING resource (COMMA resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
Expand Down Expand Up @@ -224,7 +226,7 @@ statement
| SET .*? #setConfiguration
| RESET configKey #resetQuotedConfiguration
| RESET .*? #resetConfiguration
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
identifierReference (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS options=propertyList)? #createIndex
Expand Down Expand Up @@ -315,7 +317,7 @@ unsupportedHiveNativeCommands
;

createTableHeader
: CREATE TEMPORARY? EXTERNAL? TABLE (IF NOT EXISTS)? identifierReference
: CREATE TEMPORARY? EXTERNAL? TABLE (IF errorCapturingNot EXISTS)? identifierReference
;

replaceTableHeader
Expand All @@ -342,6 +344,10 @@ locationSpec
: LOCATION stringLit
;

schemaBinding
: WITH SCHEMA (BINDING | COMPENSATION | EVOLUTION | TYPE EVOLUTION)
;

commentSpec
: COMMENT stringLit
;
Expand All @@ -351,8 +357,8 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF NOT EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
Expand Down Expand Up @@ -389,6 +395,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| shiftOperator
| BANG
;

Expand Down Expand Up @@ -588,11 +595,11 @@ matchedClause
: WHEN MATCHED (AND matchedCond=booleanExpression)? THEN matchedAction
;
notMatchedClause
: WHEN NOT MATCHED (BY TARGET)? (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
: WHEN errorCapturingNot MATCHED (BY TARGET)? (AND notMatchedCond=booleanExpression)? THEN notMatchedAction
;

notMatchedBySourceClause
: WHEN NOT MATCHED BY SOURCE (AND notMatchedBySourceCond=booleanExpression)? THEN notMatchedBySourceAction
: WHEN errorCapturingNot MATCHED BY SOURCE (AND notMatchedBySourceCond=booleanExpression)? THEN notMatchedBySourceAction
;

matchedAction
Expand Down Expand Up @@ -838,9 +845,11 @@ tableArgumentPartitioning
: ((WITH SINGLE PARTITION)
| ((PARTITION | DISTRIBUTE) BY
(((LEFT_PAREN partition+=expression (COMMA partition+=expression)* RIGHT_PAREN))
| (expression (COMMA invalidMultiPartitionExpression=expression)+)
| partition+=expression)))
((ORDER | SORT) BY
(((LEFT_PAREN sortItem (COMMA sortItem)* RIGHT_PAREN)
| (sortItem (COMMA invalidMultiSortItem=sortItem)+)
| sortItem)))?
;

Expand Down Expand Up @@ -956,28 +965,40 @@ booleanExpression
;

predicate
: NOT? kind=BETWEEN lower=valueExpression AND upper=valueExpression
| NOT? kind=IN LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN
| NOT? kind=IN LEFT_PAREN query RIGHT_PAREN
| NOT? kind=RLIKE pattern=valueExpression
| NOT? kind=(LIKE | ILIKE) quantifier=(ANY | SOME | ALL) (LEFT_PAREN RIGHT_PAREN | LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN)
| NOT? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=stringLit)?
| IS NOT? kind=NULL
| IS NOT? kind=(TRUE | FALSE | UNKNOWN)
| IS NOT? kind=DISTINCT FROM right=valueExpression
: errorCapturingNot? kind=BETWEEN lower=valueExpression AND upper=valueExpression
| errorCapturingNot? kind=IN LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN
| errorCapturingNot? kind=IN LEFT_PAREN query RIGHT_PAREN
| errorCapturingNot? kind=RLIKE pattern=valueExpression
| errorCapturingNot? kind=(LIKE | ILIKE) quantifier=(ANY | SOME | ALL) (LEFT_PAREN RIGHT_PAREN | LEFT_PAREN expression (COMMA expression)* RIGHT_PAREN)
| errorCapturingNot? kind=(LIKE | ILIKE) pattern=valueExpression (ESCAPE escapeChar=stringLit)?
| IS errorCapturingNot? kind=NULL
| IS errorCapturingNot? kind=(TRUE | FALSE | UNKNOWN)
| IS errorCapturingNot? kind=DISTINCT FROM right=valueExpression
;

errorCapturingNot
: NOT
| BANG
;

valueExpression
: primaryExpression #valueExpressionDefault
| operator=(MINUS | PLUS | TILDE) valueExpression #arithmeticUnary
| left=valueExpression operator=(ASTERISK | SLASH | PERCENT | DIV) right=valueExpression #arithmeticBinary
| left=valueExpression operator=(PLUS | MINUS | CONCAT_PIPE) right=valueExpression #arithmeticBinary
| left=valueExpression shiftOperator right=valueExpression #shiftExpression
| left=valueExpression operator=AMPERSAND right=valueExpression #arithmeticBinary
| left=valueExpression operator=HAT right=valueExpression #arithmeticBinary
| left=valueExpression operator=PIPE right=valueExpression #arithmeticBinary
| left=valueExpression comparisonOperator right=valueExpression #comparison
;

shiftOperator
: SHIFT_LEFT
| SHIFT_RIGHT
| SHIFT_RIGHT_UNSIGNED
;

datetimeUnit
: YEAR | QUARTER | MONTH
| WEEK | DAY | DAYOFYEAR
Expand Down Expand Up @@ -1143,7 +1164,7 @@ qualifiedColTypeWithPosition
;

colDefinitionDescriptorWithPosition
: NOT NULL
: errorCapturingNot NULL
| defaultExpression
| commentSpec
| colPosition
Expand All @@ -1162,7 +1183,7 @@ colTypeList
;

colType
: colName=errorCapturingIdentifier dataType (NOT NULL)? commentSpec?
: colName=errorCapturingIdentifier dataType (errorCapturingNot NULL)? commentSpec?
;

createOrReplaceTableColTypeList
Expand All @@ -1174,7 +1195,7 @@ createOrReplaceTableColType
;

colDefinitionOption
: NOT NULL
: errorCapturingNot NULL
| defaultExpression
| generationExpression
| commentSpec
Expand All @@ -1189,7 +1210,7 @@ complexColTypeList
;

complexColType
: errorCapturingIdentifier COLON? dataType (NOT NULL)? commentSpec?
: errorCapturingIdentifier COLON? dataType (errorCapturingNot NULL)? commentSpec?
;

whenClause
Expand Down Expand Up @@ -1296,7 +1317,7 @@ alterColumnAction
: TYPE dataType
| commentSpec
| colPosition
| setOrDrop=(SET | DROP) NOT NULL
| setOrDrop=(SET | DROP) errorCapturingNot NULL
| SET defaultExpression
| dropDefault=DROP DEFAULT
;
Expand Down Expand Up @@ -1343,6 +1364,7 @@ ansiNonReserved
| BIGINT
| BINARY
| BINARY_HEX
| BINDING
| BOOLEAN
| BUCKET
| BUCKETS
Expand All @@ -1365,6 +1387,7 @@ ansiNonReserved
| COMMIT
| COMPACT
| COMPACTIONS
| COMPENSATION
| COMPUTE
| CONCATENATE
| COST
Expand Down Expand Up @@ -1643,6 +1666,7 @@ nonReserved
| BIGINT
| BINARY
| BINARY_HEX
| BINDING
| BOOLEAN
| BOTH
| BUCKET
Expand Down Expand Up @@ -1672,6 +1696,7 @@ nonReserved
| COMMIT
| COMPACT
| COMPACTIONS
| COMPENSATION
| COMPUTE
| CONCATENATE
| CONSTRAINT
Expand Down Expand Up @@ -1824,8 +1849,6 @@ nonReserved
| PARTITION
| PARTITIONED
| PARTITIONS
| PERCENTILE_CONT
| PERCENTILE_DISC
| PERCENTLIT
| PIVOT
| PLACING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/**
Expand Down Expand Up @@ -69,8 +68,6 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
vmmusings marked this conversation as resolved.
Show resolved Hide resolved

String clusterName = dispatchQueryRequest.getClusterName();
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand Down
Loading
Loading