Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flint query scheduler part 2 #2961

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
noCharger marked this conversation as resolved.
Show resolved Hide resolved
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
2 changes: 2 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
DO: 'DO';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -467,6 +468,7 @@ WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WHILE: 'WHILE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
Expand Down
22 changes: 19 additions & 3 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

whileStatement
: beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel?
;

ifElseStatement
: IF booleanExpression THEN conditionalBodies+=compoundBody
(ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)*
(ELSE elseBody=compoundBody)? END IF
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -406,9 +418,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down Expand Up @@ -1522,6 +1534,7 @@ ansiNonReserved
| DIRECTORY
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ESCAPED
Expand Down Expand Up @@ -1723,6 +1736,7 @@ ansiNonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WINDOW
| YEAR
| YEARS
Expand Down Expand Up @@ -1853,6 +1867,7 @@ nonReserved
| DISTINCT
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ELSE
Expand Down Expand Up @@ -2092,6 +2107,7 @@ nonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WHEN
| WHERE
| WINDOW
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class FlintIndexOptions {
public static final String INCREMENTAL_REFRESH = "incremental_refresh";
public static final String CHECKPOINT_LOCATION = "checkpoint_location";
public static final String WATERMARK_DELAY = "watermark_delay";
public static final String SCHEDULER_MODE = "scheduler_mode";
private final Map<String, String> options = new HashMap<>();

public void setOption(String key, String value) {
Expand All @@ -33,6 +34,11 @@ public boolean autoRefresh() {
return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false"));
}

public boolean isExternalScheduler() {
// Default is false, which means using internal scheduler to refresh the index.
Comment on lines +37 to +38
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internal/External sounds ambiguous to me. Can we rephrase or add documentation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does internal means within spark? I think it is difficult to find that documentation from this option defined in SQL plugin. And I would imagine internal is internal to OpenSearch...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah internal means within spark and external could be anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to create a blocker now, but @ykmr1224 raised a good question.

We should have the name of this flag from the perspective of spark. Probably use_spark_scheduler = True | False would make more sense because we actually don't care about what an external scheduler is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from spark perspective, we want to have extensibility among external Schedulers in case they have difference behavior

return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false);
}

public Map<String, String> getProvidedOptions() {
return new HashMap<>(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -25,16 +26,19 @@ public class FlintIndexOpAlter extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class);
private final FlintIndexMetadataService flintIndexMetadataService;
private final FlintIndexOptions flintIndexOptions;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
FlintIndexMetadataService flintIndexMetadataService,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -57,7 +61,11 @@ void runOp(
"Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName());
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,21 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.asyncQueryScheduler = asyncQueryScheduler;
}

public boolean validate(FlintIndexState state) {
Expand All @@ -48,7 +53,11 @@ void runOp(
LOG.debug(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
cancelStreamingJob(flintIndexStateModel);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
} else {
cancelStreamingJob(flintIndexStateModel);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
import org.opensearch.sql.spark.flint.FlintIndexClient;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final FlintIndexStateModelService flintIndexStateModelService;
private final FlintIndexClient flintIndexClient;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final AsyncQueryScheduler asyncQueryScheduler;

public FlintIndexOpDrop getDrop(String datasource) {
return new FlintIndexOpDrop(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
flintIndexStateModelService, datasource, emrServerlessClientFactory, asyncQueryScheduler);
}

public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) {
Expand All @@ -30,12 +32,17 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da
flintIndexStateModelService,
datasource,
emrServerlessClientFactory,
flintIndexMetadataService);
flintIndexMetadataService,
asyncQueryScheduler);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory);
flintIndexStateModelService,
datasource,
flintIndexClient,
emrServerlessClientFactory,
asyncQueryScheduler);
}

public FlintIndexOpCancel getCancel(String datasource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,26 @@
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {

private static final Logger LOG = LogManager.getLogger();

private final AsyncQueryScheduler asyncQueryScheduler;

/** OpenSearch client. */
private final FlintIndexClient flintIndexClient;

public FlintIndexOpVacuum(
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
FlintIndexClient flintIndexClient,
EMRServerlessClientFactory emrServerlessClientFactory) {
EMRServerlessClientFactory emrServerlessClientFactory,
AsyncQueryScheduler asyncQueryScheduler) {
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexClient = flintIndexClient;
this.asyncQueryScheduler = asyncQueryScheduler;
}

@Override
Expand All @@ -48,6 +52,9 @@ public void runOp(
FlintIndexStateModel flintIndex,
AsyncQueryRequestContext asyncQueryRequestContext) {
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName());
}
flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
public interface AsyncQueryScheduler {
vamsimanohar marked this conversation as resolved.
Show resolved Hide resolved

/** Schedules a new job. */
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Updates an existing job with new parameters. */
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/** Unschedules a job by marking it as disabled and updating its last update time. */
void unscheduleJob(String jobId);

/** Removes a job. */
void removeJob(String jobId);
noCharger marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler.model;

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
protected String accountId;
// Scheduler jobid is the opensearch index name until we support multiple jobs per index
protected String jobId;
protected String dataSource;
protected String scheduledQuery;
protected LangType queryLang;
protected Object schedule;
protected boolean enabled;
protected Instant lastUpdateTime;
protected Instant enabledTime;
protected Long lockDurationSeconds;
protected Double jitter;
}
noCharger marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

/**
* This tests async-query-core library end-to-end using mocked implementation of extension points.
Expand Down Expand Up @@ -112,6 +113,7 @@ public class AsyncQueryCoreIntegTest {
@Mock FlintIndexClient flintIndexClient;
@Mock AsyncQueryRequestContext asyncQueryRequestContext;
@Mock MetricsService metricsService;
@Mock AsyncQueryScheduler asyncQueryScheduler;
@Mock SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider;

// storage services
Expand Down Expand Up @@ -159,7 +161,8 @@ public void setUp() {
flintIndexStateModelService,
flintIndexClient,
flintIndexMetadataService,
emrServerlessClientFactory);
emrServerlessClientFactory,
asyncQueryScheduler);
QueryHandlerFactory queryHandlerFactory =
new QueryHandlerFactory(
jobExecutionResponseReader,
Expand Down Expand Up @@ -516,6 +519,7 @@ private void givenFlintIndexMetadataExists(String indexName) {
.appId(APPLICATION_ID)
.jobId(JOB_ID)
.opensearchIndexName(indexName)
.flintIndexOptions(new FlintIndexOptions())
.build()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;

@ExtendWith(MockitoExtension.class)
public class SparkQueryDispatcherTest {
Expand All @@ -108,6 +109,7 @@ public class SparkQueryDispatcherTest {
@Mock private QueryIdProvider queryIdProvider;
@Mock private AsyncQueryRequestContext asyncQueryRequestContext;
@Mock private MetricsService metricsService;
@Mock private AsyncQueryScheduler asyncQueryScheduler;
private DataSourceSparkParameterComposer dataSourceSparkParameterComposer =
(datasourceMetadata, sparkSubmitParameters, dispatchQueryRequest, context) -> {
sparkSubmitParameters.setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, "basic");
Expand Down
Loading
Loading