Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ALTER Index Queries. #2554

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ allprojects {
configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib:1.9.10"
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-common:1.9.10"
resolutionStrategy.force "net.bytebuddy:byte-buddy:1.14.9"
}
}

Expand Down
27 changes: 26 additions & 1 deletion spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ skippingIndexStatement
: createSkippingIndexStatement
| refreshSkippingIndexStatement
| describeSkippingIndexStatement
| alterSkippingIndexStatement
| dropSkippingIndexStatement
| vacuumSkippingIndexStatement
| analyzeSkippingIndexStatement
;

createSkippingIndexStatement
Expand All @@ -46,6 +48,12 @@ describeSkippingIndexStatement
: (DESC | DESCRIBE) SKIPPING INDEX ON tableName
;

alterSkippingIndexStatement
: ALTER SKIPPING INDEX
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName
;
Expand All @@ -59,6 +67,7 @@ coveringIndexStatement
| refreshCoveringIndexStatement
| showCoveringIndexStatement
| describeCoveringIndexStatement
| alterCoveringIndexStatement
| dropCoveringIndexStatement
| vacuumCoveringIndexStatement
;
Expand All @@ -83,6 +92,12 @@ describeCoveringIndexStatement
: (DESC | DESCRIBE) INDEX indexName ON tableName
;

alterCoveringIndexStatement
: ALTER INDEX indexName
ON tableName
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropCoveringIndexStatement
: DROP INDEX indexName ON tableName
;
Expand All @@ -91,11 +106,16 @@ vacuumCoveringIndexStatement
: VACUUM INDEX indexName ON tableName
;

analyzeSkippingIndexStatement
: ANALYZE SKIPPING INDEX ON tableName
;

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| alterMaterializedViewStatement
| dropMaterializedViewStatement
| vacuumMaterializedViewStatement
;
Expand All @@ -118,6 +138,11 @@ describeMaterializedViewStatement
: (DESC | DESCRIBE) MATERIALIZED VIEW mvName=multipartIdentifier
;

alterMaterializedViewStatement
: ALTER MATERIALIZED VIEW mvName=multipartIdentifier
WITH LEFT_PAREN propertyList RIGHT_PAREN
;

dropMaterializedViewStatement
: DROP MATERIALIZED VIEW mvName=multipartIdentifier
;
Expand Down Expand Up @@ -163,7 +188,7 @@ indexColTypeList
;

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX | BLOOM_FILTER)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ nonReserved

// Flint lexical tokens

BLOOM_FILTER: 'BLOOM_FILTER';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';
Expand All @@ -155,6 +156,8 @@ DOT: '.';


AS: 'AS';
ALTER: 'ALTER';
ANALYZE: 'ANALYZE';
CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
Expand Down
14 changes: 12 additions & 2 deletions spark/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ COMMA: ',';
DOT: '.';
LEFT_BRACKET: '[';
RIGHT_BRACKET: ']';
BANG: '!';

// NOTE: If you add a new token in the list below, you should update the list of keywords
// and reserved tag in `docs/sql-ref-ansi-compliance.md#sql-keywords`, and
Expand Down Expand Up @@ -273,7 +274,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NOT: 'NOT' | '!';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
NUMERIC: 'NUMERIC';
Expand Down Expand Up @@ -510,8 +511,13 @@ BIGDECIMAL_LITERAL
| DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
;

// Generalize the identifier to give a sensible INVALID_IDENTIFIER error message:
// * Unicode letters rather than a-z and A-Z only
// * URI paths for table references using paths
// We then narrow down to ANSI rules in exitUnquotedIdentifier() in the parser.
IDENTIFIER
: (LETTER | DIGIT | '_')+
: (UNICODE_LETTER | DIGIT | '_')+
| UNICODE_LETTER+ '://' (UNICODE_LETTER | DIGIT | '_' | '/' | '-' | '.' | '?' | '=' | '&' | '#' | '%')+
;

BACKQUOTED_IDENTIFIER
Expand All @@ -535,6 +541,10 @@ fragment LETTER
: [A-Z]
;

fragment UNICODE_LETTER
: [\p{L}]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;
Expand Down
3 changes: 2 additions & 1 deletion spark/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ describeFuncName
| comparisonOperator
| arithmeticOperator
| predicateOperator
| BANG
;

describeColName
Expand Down Expand Up @@ -946,7 +947,7 @@ expressionSeq
;

booleanExpression
: NOT booleanExpression #logicalNot
: (NOT | BANG) booleanExpression #logicalNot
| EXISTS LEFT_PAREN query RIGHT_PAREN #exists
| valueExpression predicate? #predicated
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
Expand All @@ -27,58 +28,71 @@
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDelete;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
@RequiredArgsConstructor
public class IndexDMLHandler extends AsyncQueryHandler {
private static final Logger LOG = LogManager.getLogger();

// To be deprecated in 3.0. Still using for backward compatibility.
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataReader flintIndexMetadataReader;

private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;

private final StateStore stateStore;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId);
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
}

@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = flintIndexMetadataReader.getFlintIndexMetadata(indexDetails);
// if index is created without auto refresh. there is no job to cancel.
String status = JobRunState.FAILED.toString();
String error = "";
long startTime = 0L;
long startTime = System.currentTimeMillis();
try {
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
jobCancelOp.apply(indexMetadata);

FlintIndexOp indexDeleteOp =
new FlintIndexOpDelete(stateStore, dispatchQueryRequest.getDatasource());
indexDeleteOp.apply(indexMetadata);
status = JobRunState.SUCCESS.toString();
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
} catch (Exception e) {
error = e.getMessage();
LOG.error(e);
LOG.error(e.getMessage());
AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
startTime);
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
}
}

private AsyncQueryId storeIndexDMLResult(
DispatchQueryRequest dispatchQueryRequest,
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long startTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
Expand All @@ -88,10 +102,48 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
System.currentTimeMillis());
String resultIndex = dataSourceMetadata.getResultIndex();
createIndexDMLResult(stateStore, resultIndex).apply(indexDMLResult);
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
return asyncQueryId;
}

return new DispatchQueryResponse(asyncQueryId, DROP_INDEX_JOB_ID, resultIndex, null);
private void executeIndexOp(
vmmusings marked this conversation as resolved.
Show resolved Hide resolved
DispatchQueryRequest dispatchQueryRequest,
IndexQueryDetails indexQueryDetails,
FlintIndexMetadata indexMetadata) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
FlintIndexOp dropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
dropOp.apply(indexMetadata);
break;
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
indexQueryDetails.getFlintIndexOptions(),
stateStore,
dispatchQueryRequest.getDatasource(),
emrServerlessClient,
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
"IndexQueryActionType: %s is not supported in IndexDMLHandler.",
indexQueryDetails.getIndexQueryActionType()));
}
}

private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) {
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName());
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", indexDetails.openSearchIndexName()));
}
return indexMetadataMap.get(indexDetails.openSearchIndexName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.spark.dispatcher;

import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
Expand All @@ -14,7 +15,7 @@
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReader;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
Expand All @@ -23,27 +24,33 @@
/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataReader flintIndexMetadataReader;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataReader flintIndexMetadataReader,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataReader = flintIndexMetadataReader;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
FlintIndexMetadata indexMetadata =
flintIndexMetadataReader.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
jobCancelOp.apply(indexMetadata);
Expand Down
Loading
Loading