Skip to content

Commit

Permalink
Rename classes
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 1, 2024
1 parent d260e0e commit 503b659
Show file tree
Hide file tree
Showing 15 changed files with 255 additions and 134 deletions.
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
2 changes: 2 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY';
DISTINCT: 'DISTINCT';
DISTRIBUTE: 'DISTRIBUTE';
DIV: 'DIV';
DO: 'DO';
DOUBLE: 'DOUBLE';
DROP: 'DROP';
ELSE: 'ELSE';
Expand Down Expand Up @@ -467,6 +468,7 @@ WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
WHERE: 'WHERE';
WHILE: 'WHILE';
WINDOW: 'WINDOW';
WITH: 'WITH';
WITHIN: 'WITHIN';
Expand Down
22 changes: 19 additions & 3 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

whileStatement
: beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel?
;

ifElseStatement
: IF booleanExpression THEN conditionalBodies+=compoundBody
(ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)*
(ELSE elseBody=compoundBody)? END IF
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -406,9 +418,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down Expand Up @@ -1522,6 +1534,7 @@ ansiNonReserved
| DIRECTORY
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ESCAPED
Expand Down Expand Up @@ -1723,6 +1736,7 @@ ansiNonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WINDOW
| YEAR
| YEARS
Expand Down Expand Up @@ -1853,6 +1867,7 @@ nonReserved
| DISTINCT
| DISTRIBUTE
| DIV
| DO
| DOUBLE
| DROP
| ELSE
Expand Down Expand Up @@ -2092,6 +2107,7 @@ nonReserved
| VOID
| WEEK
| WEEKS
| WHILE
| WHEN
| WHERE
| WINDOW
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.opensearch.sql.spark.scheduler;

public interface AsyncQueryScheduler {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler.model;

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@NoArgsConstructor
@AllArgsConstructor
public abstract class AsyncQuerySchedulerRequest {
protected String accountId;
protected String jobId;
protected String dataSource;
protected String scheduledQuery;
protected LangType queryLang;
protected boolean enabled;
protected Instant lastUpdateTime;
protected Instant enabledTime;
protected Long lockDurationSeconds;
protected Double jitter;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

/** Scheduler class for managing asynchronous query jobs. */
public class OpenSearchAsyncQueryScheduler {
public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
Expand All @@ -55,15 +56,18 @@ public class OpenSearchAsyncQueryScheduler {
public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
AsyncQueryScheduledQueryJob scheduledQueryJob =
AsyncQueryScheduledQueryJob.getJobRunnerInstance();
scheduledQueryJob.setClusterService(clusterService);
scheduledQueryJob.setThreadPool(threadPool);
scheduledQueryJob.setClient(client);
}

/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
assertRequestType(asyncQuerySchedulerRequest);
OpenSearchScheduleQueryJobRequest request =
(OpenSearchScheduleQueryJobRequest) asyncQuerySchedulerRequest;
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
createAsyncQuerySchedulerIndex();
}
Expand Down Expand Up @@ -94,17 +98,20 @@ public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
/** Unschedules a job by marking it as disabled and updating its last update time. */
public void unscheduleJob(String jobId) throws IOException {
assertIndexExists();
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName(jobId)
OpenSearchScheduleQueryJobRequest request =
OpenSearchScheduleQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request);
}

/** Updates an existing job with new parameters. */
public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) throws IOException {
assertRequestType(asyncQuerySchedulerRequest);
OpenSearchScheduleQueryJobRequest request =
(OpenSearchScheduleQueryJobRequest) asyncQuerySchedulerRequest;
assertIndexExists();
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Expand Down Expand Up @@ -192,6 +199,12 @@ private void assertIndexExists() {

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return OpenSearchRefreshIndexJob.getJobRunnerInstance();
return AsyncQueryScheduledQueryJob.getJobRunnerInstance();
}

private void assertRequestType(AsyncQuerySchedulerRequest request) {
if (!(request instanceof OpenSearchScheduleQueryJobRequest)) {
throw new IllegalArgumentException("Request should be OpenSearchScheduleQueryJobRequest");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;

public class OpenSearchRefreshIndexJobRequestParser {
public class OpenSearchScheduleQueryJobRequestParser {

private static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
Expand All @@ -28,37 +29,46 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti

public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder =
OpenSearchRefreshIndexJobRequest.builder();
OpenSearchScheduleQueryJobRequest.OpenSearchScheduleQueryJobRequestBuilder builder =
OpenSearchScheduleQueryJobRequest.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
builder.jobName(parser.text());
case OpenSearchScheduleQueryJobRequest.ACCOUNT_ID_FIELD:
builder.accountId(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD:
builder.jobType(parser.text());
case OpenSearchScheduleQueryJobRequest.JOB_ID_FIELD:
builder.jobId(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
case OpenSearchScheduleQueryJobRequest.DATA_SOURCE_NAME_FIELD:
builder.dataSource(parser.text());
break;
case OpenSearchScheduleQueryJobRequest.SCHEDULED_QUERY_FIELD:
builder.scheduledQuery(parser.text());
break;
case OpenSearchScheduleQueryJobRequest.QUERY_LANG_FIELD:
builder.queryLang(LangType.fromString(parser.text()));
break;
case OpenSearchScheduleQueryJobRequest.ENABLED_FIELD:
builder.enabled(parser.booleanValue());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
case OpenSearchScheduleQueryJobRequest.ENABLED_TIME_FIELD:
builder.enabledTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
case OpenSearchScheduleQueryJobRequest.LAST_UPDATE_TIME_FIELD:
builder.lastUpdateTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
case OpenSearchScheduleQueryJobRequest.SCHEDULE_FIELD:
builder.schedule(ScheduleParser.parse(parser));
break;
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
case OpenSearchScheduleQueryJobRequest.LOCK_DURATION_SECONDS:
builder.lockDurationSeconds(parser.longValue());
break;
case OpenSearchRefreshIndexJobRequest.JITTER:
case OpenSearchScheduleQueryJobRequest.JITTER:
builder.jitter(parser.doubleValue());
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.Plugin;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest;
import org.opensearch.threadpool.ThreadPool;

/**
Expand All @@ -29,21 +29,21 @@
* and using singleton job runner to ensure we register a usable job runner instance to JobScheduler
* plugin.
*/
public class OpenSearchRefreshIndexJob implements ScheduledJobRunner {
public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner {

private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class);
private static final Logger log = LogManager.getLogger(AsyncQueryScheduledQueryJob.class);

public static OpenSearchRefreshIndexJob INSTANCE = new OpenSearchRefreshIndexJob();
public static AsyncQueryScheduledQueryJob INSTANCE = new AsyncQueryScheduledQueryJob();

public static OpenSearchRefreshIndexJob getJobRunnerInstance() {
public static AsyncQueryScheduledQueryJob getJobRunnerInstance() {
return INSTANCE;
}

private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;

private OpenSearchRefreshIndexJob() {
private AsyncQueryScheduledQueryJob() {
// Singleton class, use getJobRunnerInstance method instead of constructor
}

Expand All @@ -61,7 +61,7 @@ public void setClient(Client client) {

@Override
public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) {
if (!(jobParameter instanceof OpenSearchRefreshIndexJobRequest)) {
if (!(jobParameter instanceof OpenSearchScheduleQueryJobRequest)) {
throw new IllegalStateException(
"Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: "
+ jobParameter.getClass().getCanonicalName());
Expand Down
Loading

0 comments on commit 503b659

Please sign in to comment.