Skip to content

Commit

Permalink
Merge branch 'main' into dqs/add-query-status
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki MORITA <[email protected]>
  • Loading branch information
ykmr1224 authored Sep 5, 2024
2 parents 962f411 + da622eb commit 965c449
Show file tree
Hide file tree
Showing 46 changed files with 1,794 additions and 566 deletions.
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
2 changes: 2 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
DO: 'DO';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -467,6 +468,7 @@ WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WHILE: 'WHILE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
Expand Down
22 changes: 19 additions & 3 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

whileStatement
: beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel?
;

ifElseStatement
: IF booleanExpression THEN conditionalBodies+=compoundBody
(ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)*
(ELSE elseBody=compoundBody)? END IF
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -406,9 +418,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down Expand Up @@ -1522,6 +1534,7 @@ ansiNonReserved
| DIRECTORY
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ESCAPED
Expand Down Expand Up @@ -1723,6 +1736,7 @@ ansiNonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WINDOW
| YEAR
| YEARS
Expand Down Expand Up @@ -1853,6 +1867,7 @@ nonReserved
| DISTINCT
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ELSE
Expand Down Expand Up @@ -2092,6 +2107,7 @@ nonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WHEN
| WHERE
| WINDOW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public class SparkConstants {
public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/";
public static final String FLINT_JOB_QUERY = "spark.flint.job.query";
public static final String FLINT_JOB_QUERY_ID = "spark.flint.job.queryId";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED =
"spark.flint.job.externalScheduler.enabled";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL =
"spark.flint.job.externalScheduler.interval";
public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex";
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public String cancelJob(
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, dispatchQueryRequest.getDatasource()));

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -84,7 +84,7 @@ public DispatchQueryResponse submit(
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.jobType(JobType.REFRESH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.status(QueryState.WAITING)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public DispatchQueryResponse dispatch(
dispatchQueryRequest, asyncQueryRequestContext, dataSourceMetadata);
}

List<String> validationErrors = SQLQueryUtils.validateSparkSqlQuery(query);
List<String> validationErrors =
SQLQueryUtils.validateSparkSqlQuery(
dataSourceService.getDataSource(dispatchQueryRequest.getDatasource()), query);
if (!validationErrors.isEmpty()) {
throw new IllegalArgumentException(
"Query is not allowed: " + String.join(", ", validationErrors));
Expand Down Expand Up @@ -177,7 +179,7 @@ private AsyncQueryHandler getAsyncQueryHandlerForExistingQuery(
return queryHandlerFactory.getInteractiveQueryHandler();
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
return queryHandlerFactory.getIndexDMLHandler();
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
} else if (asyncQueryJobMetadata.getJobType() == JobType.REFRESH) {
return queryHandlerFactory.getRefreshQueryHandler(asyncQueryJobMetadata.getAccountId());
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
return queryHandlerFactory.getStreamingQueryHandler(asyncQueryJobMetadata.getAccountId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class FlintIndexOptions {
public static final String INCREMENTAL_REFRESH = "incremental_refresh";
public static final String CHECKPOINT_LOCATION = "checkpoint_location";
public static final String WATERMARK_DELAY = "watermark_delay";
public static final String SCHEDULER_MODE = "scheduler_mode";
private final Map<String, String> options = new HashMap<>();

public void setOption(String key, String value) {
Expand All @@ -33,6 +34,11 @@ public boolean autoRefresh() {
return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false"));
}

public boolean isExternalScheduler() {
// Default is false, which means using internal scheduler to refresh the index.
return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false);
}

public Map<String, String> getProvidedOptions() {
return new HashMap<>(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
public enum JobType {
INTERACTIVE("interactive"),
STREAMING("streaming"),
REFRESH("refresh"),
BATCH("batch");

private String text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -25,16 +26,19 @@ public class FlintIndexOpAlter extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class);
private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOptions flintIndexOptions;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
FlintIndexMetadataService flintIndexMetadataService,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -57,7 +61,11 @@ void runOp(
"Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName());
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.asyncQueryScheduler = asyncQueryScheduler;
}

public boolean validate(FlintIndexState state) {
Expand All @@ -48,7 +53,11 @@ void runOp(
LOG.debug(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final FlintIndexStateModelService flintIndexStateModelService;
private final FlintIndexClient flintIndexClient;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop getDrop(String datasource) {
return new FlintIndexOpDrop(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
flintIndexStateModelService, datasource, emrServerlessClientFactory, asyncQueryScheduler);
}

public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) {
Expand All @@ -30,12 +32,17 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
flintIndexStateModelService,
datasource,
emrServerlessClientFactory,
flintIndexMetadataService);
flintIndexMetadataService,
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {

private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

/** OpenSearch client. */
private final FlintIndexClient flintIndexClient;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexClient = flintIndexClient;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -48,6 +52,9 @@ public void runOp(
FlintIndexStateModel flintIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName());
}
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
public interface AsyncQueryScheduler {

/**
* Schedules a new job in the system. This method creates a new job entry based on the provided
* request parameters.
*
* <p>Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
* already scheduled job.
*
* <p>Use cases: - Changing the schedule of an existing job - Modifying query parameters of a
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
* used when you want to temporarily stop a job from running but keep its configuration and
* history in the system.
*
* <p>Use cases: - Pausing a job that's causing issues without losing its configuration -
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
*/
void unscheduleJob(String jobId);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
* its associated data from the system.
*
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
*/
void removeJob(String jobId);
}
Loading

0 comments on commit 965c449

Please sign in to comment.