Skip to content

Commit

Permalink
Merge branch 'main' into dqs/batch-indexname
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki MORITA <[email protected]>
  • Loading branch information
ykmr1224 authored Sep 6, 2024
2 parents a79cdbd + 83e89fb commit 49c4feb
Show file tree
Hide file tree
Showing 74 changed files with 2,235 additions and 1,533 deletions.
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
2 changes: 2 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
DO: 'DO';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -467,6 +468,7 @@ WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WHILE: 'WHILE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
Expand Down
22 changes: 19 additions & 3 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

whileStatement
: beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel?
;

ifElseStatement
: IF booleanExpression THEN conditionalBodies+=compoundBody
(ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)*
(ELSE elseBody=compoundBody)? END IF
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -406,9 +418,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down Expand Up @@ -1522,6 +1534,7 @@ ansiNonReserved
| DIRECTORY
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ESCAPED
Expand Down Expand Up @@ -1723,6 +1736,7 @@ ansiNonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WINDOW
| YEAR
| YEARS
Expand Down Expand Up @@ -1853,6 +1867,7 @@ nonReserved
| DISTINCT
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ELSE
Expand Down Expand Up @@ -2092,6 +2107,7 @@ nonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WHEN
| WHERE
| WINDOW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ CreateAsyncQueryResponse createAsyncQuery(
* @param queryId queryId.
* @return {@link AsyncQueryExecutionResponse}
*/
AsyncQueryExecutionResponse getAsyncQueryResults(String queryId);
AsyncQueryExecutionResponse getAsyncQueryResults(
String queryId, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Cancels running async query and returns the cancelled queryId.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ public CreateAsyncQueryResponse createAsyncQuery(
}

@Override
public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
public AsyncQueryExecutionResponse getAsyncQueryResults(
String queryId, AsyncQueryRequestContext asyncQueryRequestContext) {
Optional<AsyncQueryJobMetadata> jobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (jobMetadata.isPresent()) {
String sessionId = jobMetadata.get().getSessionId();
JSONObject jsonObject = sparkQueryDispatcher.getQueryResponse(jobMetadata.get());
JSONObject jsonObject =
sparkQueryDispatcher.getQueryResponse(jobMetadata.get(), asyncQueryRequestContext);
if (JobRunState.SUCCESS.toString().equals(jsonObject.getString(STATUS_FIELD))) {
DefaultSparkSqlFunctionResponseHandle sparkSqlFunctionResponseHandle =
new DefaultSparkSqlFunctionResponseHandle(jsonObject);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public class SparkConstants {
"com.amazonaws.emr.AssumeRoleAWSCredentialsProvider";
public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/";
public static final String FLINT_JOB_QUERY = "spark.flint.job.query";
public static final String FLINT_JOB_QUERY_ID = "spark.flint.job.queryId";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED =
"spark.flint.job.externalScheduler.enabled";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL =
"spark.flint.job.externalScheduler.interval";
public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex";
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
/** Process async query request. */
public abstract class AsyncQueryHandler {

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata);
public JSONObject getQueryResponse(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
JSONObject result = getResponseFromResultIndex(asyncQueryJobMetadata, asyncQueryRequestContext);
if (result.has(DATA_FIELD)) {
JSONObject items = result.getJSONObject(DATA_FIELD);

Expand All @@ -35,7 +37,8 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
result.put(ERROR_FIELD, error);
return result;
} else {
JSONObject statement = getResponseFromExecutor(asyncQueryJobMetadata);
JSONObject statement =
getResponseFromExecutor(asyncQueryJobMetadata, asyncQueryRequestContext);

// Consider statement still running if state is success but query result unavailable
if (isSuccessState(statement)) {
Expand All @@ -50,10 +53,12 @@ private boolean isSuccessState(JSONObject statement) {
}

protected abstract JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata);
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext);

protected abstract JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata);
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext);

public abstract String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@ public class BatchQueryHandler extends AsyncQueryHandler {
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
// either empty json when the result is not available or data with status
// Fetch from Result Index
return jobExecutionResponseReader.getResultWithJobId(
asyncQueryJobMetadata.getJobId(), asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
JSONObject result = new JSONObject();
// make call to EMR Serverless when related result index documents are not available
GetJobRunResult getJobRunResult =
Expand Down Expand Up @@ -87,6 +91,7 @@ public DispatchQueryResponse submit(
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.queryId(context.getQueryId())
.query(dispatchQueryRequest.getQuery())
.dataSource(
context.getDataSourceMetadata(),
Expand All @@ -105,7 +110,7 @@ public DispatchQueryResponse submit(
.jobId(jobId)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.indexName(getIndexName(context))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DispatchQueryResponse submit(
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.build();
} catch (Exception e) {
LOG.error(e.getMessage());
Expand All @@ -100,7 +100,7 @@ public DispatchQueryResponse submit(
.jobId(DML_QUERY_JOB_ID)
.resultIndex(dataSourceMetadata.getResultIndex())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.INTERACTIVE)
.jobType(JobType.BATCH)
.build();
}
}
Expand Down Expand Up @@ -138,8 +138,6 @@ private FlintIndexOp getIndexOp(
case ALTER:
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand All @@ -162,14 +160,18 @@ private FlintIndexMetadata getFlintIndexMetadata(
}

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
// Consider statement still running if result doc created in submit() is not available yet
JSONObject result = new JSONObject();
result.put(STATUS_FIELD, StatementState.RUNNING.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,26 @@ public class InteractiveQueryHandler extends AsyncQueryHandler {
protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

@Override
protected JSONObject getResponseFromResultIndex(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromResultIndex(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
String queryId = asyncQueryJobMetadata.getQueryId();
return jobExecutionResponseReader.getResultWithQueryId(
queryId, asyncQueryJobMetadata.getResultIndex());
}

@Override
protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJobMetadata) {
protected JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
JSONObject result = new JSONObject();
String queryId = asyncQueryJobMetadata.getQueryId();
Statement statement =
getStatementByQueryId(
asyncQueryJobMetadata.getSessionId(),
queryId,
asyncQueryJobMetadata.getDatasourceName());
asyncQueryJobMetadata.getDatasourceName(),
asyncQueryRequestContext);
StatementState statementState = statement.getStatementState();
result.put(STATUS_FIELD, statementState.getState());
result.put(ERROR_FIELD, Optional.of(statement.getStatementModel().getError()).orElse(""));
Expand All @@ -79,7 +84,8 @@ public String cancelJob(
getStatementByQueryId(
asyncQueryJobMetadata.getSessionId(),
queryId,
asyncQueryJobMetadata.getDatasourceName())
asyncQueryJobMetadata.getDatasourceName(),
asyncQueryRequestContext)
.cancel();
return queryId;
}
Expand Down Expand Up @@ -148,12 +154,16 @@ public DispatchQueryResponse submit(
.build();
}

private Statement getStatementByQueryId(String sessionId, String queryId, String datasourceName) {
private Statement getStatementByQueryId(
String sessionId,
String queryId,
String datasourceName,
AsyncQueryRequestContext asyncQueryRequestContext) {
Optional<Session> session = sessionManager.getSession(sessionId, datasourceName);
if (session.isPresent()) {
// todo, statementId == jobId if statement running in session.
StatementId statementId = new StatementId(queryId);
Optional<Statement> statement = session.get().get(statementId);
Optional<Statement> statement = session.get().get(statementId, asyncQueryRequestContext);
if (statement.isPresent()) {
return statement.get();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public String cancelJob(
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, dispatchQueryRequest.getDatasource()));

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -83,7 +83,7 @@ public DispatchQueryResponse submit(
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.jobType(JobType.REFRESH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public DispatchQueryResponse dispatch(
dispatchQueryRequest, asyncQueryRequestContext, dataSourceMetadata);
}

List<String> validationErrors = SQLQueryUtils.validateSparkSqlQuery(query);
List<String> validationErrors =
SQLQueryUtils.validateSparkSqlQuery(
dataSourceService.getDataSource(dispatchQueryRequest.getDatasource()), query);
if (!validationErrors.isEmpty()) {
throw new IllegalArgumentException(
"Query is not allowed: " + String.join(", ", validationErrors));
Expand Down Expand Up @@ -148,7 +150,6 @@ private boolean isEligibleForStreamingQuery(IndexQueryDetails indexQueryDetails)

private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetails) {
return IndexQueryActionType.DROP.equals(indexQueryDetails.getIndexQueryActionType())
|| IndexQueryActionType.VACUUM.equals(indexQueryDetails.getIndexQueryActionType())
|| (IndexQueryActionType.ALTER.equals(indexQueryDetails.getIndexQueryActionType())
&& (indexQueryDetails
.getFlintIndexOptions()
Expand All @@ -157,9 +158,11 @@ private boolean isEligibleForIndexDMLHandling(IndexQueryDetails indexQueryDetail
&& !indexQueryDetails.getFlintIndexOptions().autoRefresh()));
}

public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public JSONObject getQueryResponse(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
return getAsyncQueryHandlerForExistingQuery(asyncQueryJobMetadata)
.getQueryResponse(asyncQueryJobMetadata);
.getQueryResponse(asyncQueryJobMetadata, asyncQueryRequestContext);
}

public String cancelJob(
Expand All @@ -175,7 +178,7 @@ private AsyncQueryHandler getAsyncQueryHandlerForExistingQuery(
return queryHandlerFactory.getInteractiveQueryHandler();
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
return queryHandlerFactory.getIndexDMLHandler();
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
} else if (asyncQueryJobMetadata.getJobType() == JobType.REFRESH) {
return queryHandlerFactory.getRefreshQueryHandler(asyncQueryJobMetadata.getAccountId());
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
return queryHandlerFactory.getStreamingQueryHandler(asyncQueryJobMetadata.getAccountId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public DispatchQueryResponse submit(
sparkSubmitParametersBuilderProvider
.getSparkSubmitParametersBuilder()
.clusterName(clusterName)
.queryId(context.getQueryId())
.query(dispatchQueryRequest.getQuery())
.structuredStreaming(true)
.dataSource(
Expand Down
Loading

0 comments on commit 49c4feb

Please sign in to comment.