Skip to content

Commit

Permalink
Flint query scheduler part 2 (opensearch-project#2961)
Browse files Browse the repository at this point in the history
* Flint query scheduler part 2

Signed-off-by: Louis Chu <[email protected]>

* spotless apply

Signed-off-by: Louis Chu <[email protected]>

* Add UT

Signed-off-by: Louis Chu <[email protected]>

* Resolve comments

Signed-off-by: Louis Chu <[email protected]>

* Add more UTs

Signed-off-by: Louis Chu <[email protected]>

* Resolve comments

Signed-off-by: Louis Chu <[email protected]>

* Use SQL thread pool

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Sep 4, 2024
1 parent b4a6c60 commit 729bb13
Show file tree
Hide file tree
Showing 33 changed files with 1,371 additions and 536 deletions.
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
2 changes: 2 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
DO: 'DO';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -467,6 +468,7 @@ WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WHILE: 'WHILE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
Expand Down
22 changes: 19 additions & 3 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

whileStatement
: beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel?
;

ifElseStatement
: IF booleanExpression THEN conditionalBodies+=compoundBody
(ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)*
(ELSE elseBody=compoundBody)? END IF
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -406,9 +418,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down Expand Up @@ -1522,6 +1534,7 @@ ansiNonReserved
| DIRECTORY
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ESCAPED
Expand Down Expand Up @@ -1723,6 +1736,7 @@ ansiNonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WINDOW
| YEAR
| YEARS
Expand Down Expand Up @@ -1853,6 +1867,7 @@ nonReserved
| DISTINCT
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ELSE
Expand Down Expand Up @@ -2092,6 +2107,7 @@ nonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WHEN
| WHERE
| WINDOW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class FlintIndexOptions {
public static final String INCREMENTAL_REFRESH = "incremental_refresh";
public static final String CHECKPOINT_LOCATION = "checkpoint_location";
public static final String WATERMARK_DELAY = "watermark_delay";
public static final String SCHEDULER_MODE = "scheduler_mode";
private final Map<String, String> options = new HashMap<>();

public void setOption(String key, String value) {
Expand All @@ -33,6 +34,11 @@ public boolean autoRefresh() {
return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false"));
}

public boolean isExternalScheduler() {
// Default is false, which means using internal scheduler to refresh the index.
return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false);
}

public Map<String, String> getProvidedOptions() {
return new HashMap<>(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -25,16 +26,19 @@ public class FlintIndexOpAlter extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class);
private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOptions flintIndexOptions;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
FlintIndexMetadataService flintIndexMetadataService,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -57,7 +61,11 @@ void runOp(
"Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName());
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.asyncQueryScheduler = asyncQueryScheduler;
}

public boolean validate(FlintIndexState state) {
Expand All @@ -48,7 +53,11 @@ void runOp(
LOG.debug(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final FlintIndexStateModelService flintIndexStateModelService;
private final FlintIndexClient flintIndexClient;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop getDrop(String datasource) {
return new FlintIndexOpDrop(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
flintIndexStateModelService, datasource, emrServerlessClientFactory, asyncQueryScheduler);
}

public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) {
Expand All @@ -30,12 +32,17 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
flintIndexStateModelService,
datasource,
emrServerlessClientFactory,
flintIndexMetadataService);
flintIndexMetadataService,
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {

private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

/** OpenSearch client. */
private final FlintIndexClient flintIndexClient;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexClient = flintIndexClient;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -48,6 +52,9 @@ public void runOp(
FlintIndexStateModel flintIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName());
}
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
public interface AsyncQueryScheduler {

/**
* Schedules a new job in the system. This method creates a new job entry based on the provided
* request parameters.
*
* <p>Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
* already scheduled job.
*
* <p>Use cases: - Changing the schedule of an existing job - Modifying query parameters of a
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
* used when you want to temporarily stop a job from running but keep its configuration and
* history in the system.
*
* <p>Use cases: - Pausing a job that's causing issues without losing its configuration -
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
*/
void unscheduleJob(String jobId);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
* its associated data from the system.
*
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
*/
void removeJob(String jobId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler.model;

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
protected String accountId;
// Scheduler jobid is the opensearch index name until we support multiple jobs per index
protected String jobId;
protected String dataSource;
protected String scheduledQuery;
protected LangType queryLang;
protected Object schedule;
protected boolean enabled;
protected Instant lastUpdateTime;
protected Instant enabledTime;
protected Long lockDurationSeconds;
protected Double jitter;
}
Loading

0 comments on commit 729bb13

Please sign in to comment.