Skip to content

Commit

Permalink
[Feature] Flint query scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 15, 2024
1 parent 0c2e1da commit 94dfff6
Show file tree
Hide file tree
Showing 22 changed files with 883 additions and 35 deletions.
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
20 changes: 18 additions & 2 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand All @@ -68,6 +68,14 @@ singleStatement
: statement SEMICOLON* EOF
;

beginLabel
: multipartIdentifier COLON
;

endLabel
: multipartIdentifier
;

singleExpression
: namedExpression EOF
;
Expand Down Expand Up @@ -174,6 +182,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand Down Expand Up @@ -853,13 +863,17 @@ identifierComment

relationPrimary
: identifierReference temporalClause?
sample? tableAlias #tableName
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;

optionsClause
: WITH options=propertyList
;

inlineTable
: VALUES expression (COMMA expression)* tableAlias
;
Expand Down Expand Up @@ -1572,6 +1586,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1920,6 +1935,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

public interface AsyncQueryScheduler {
void scheduleJob(AsyncQuerySchedulerJobRequest request);

void unscheduleJob(AsyncQuerySchedulerJobRequest request);

void removeJob(AsyncQuerySchedulerJobRequest request);

void updateJob(AsyncQuerySchedulerJobRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

public interface AsyncQuerySchedulerJob {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

public abstract class AsyncQuerySchedulerJobRequest {
private final String jobName;
private final String jobType;
private final String schedule;
private final boolean enabled;

protected AsyncQuerySchedulerJobRequest(Builder<?> builder) {
this.jobName = builder.jobName;
this.jobType = builder.jobType;
this.schedule = builder.schedule;
this.enabled = builder.enabled;
}

public String getJobName() {
return jobName;
}

public String getJobType() {
return jobType;
}

public String getRawSchedule() {
return schedule;
}

public boolean isEnabled() {
return enabled;
}

public abstract static class Builder<T extends Builder<T>> {
private String jobName;
private String jobType;
private String schedule;
private boolean enabled;

public T withJobName(String jobName) {
this.jobName = jobName;
return self();
}

public T withJobType(String jobType) {
this.jobType = jobType;
return self();
}

public T withSchedule(String schedule) {
this.schedule = schedule;
return self();
}

public T withEnabled(boolean enabled) {
this.enabled = enabled;
return self();
}

protected abstract T self();

public abstract AsyncQuerySchedulerJobRequest build();
}
}
2 changes: 2 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
"async-query-scheduler-index-mapping.yml";
private static final Logger LOG = LogManager.getLogger();

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

/**
* This class implements DataSourceMetadataStorage interface using OpenSearch as underlying
* storage.
*
* @param client opensearch NodeClient.
* @param clusterService ClusterService.
*/
public OpenSearchAsyncQueryScheduler(
Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
}

public ScheduledJobRunner getJobRunner() {
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
return openSearchRefreshIndexJob;
}

public ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.Builder builder =
new OpenSearchRefreshIndexJobRequest.Builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.NAME_FIELD:
builder.withJobName(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_FILED:
builder.withEnabled(parser.booleanValue());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FILED:
builder.withEnabledTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
builder.withLastUpdateTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
builder.withSchedule(ScheduleParser.parse(parser).toString());
break;
case OpenSearchRefreshIndexJobRequest.INDEX_NAME_FIELD:
builder.withIndexToWatch(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
builder.withLockDurationSeconds(parser.longValue());
break;
case OpenSearchRefreshIndexJobRequest.JITTER:
builder.withJitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return builder.build();
};
}

@Override
public void scheduleJob(AsyncQuerySchedulerJobRequest request) {
// Implementation to schedule the job
}

@Override
public void unscheduleJob(AsyncQuerySchedulerJobRequest request) {
// Implementation to unschedule the job
}

@Override
public void removeJob(AsyncQuerySchedulerJobRequest request) {
// Implementation to remove the job
}

@Override
public void updateJob(AsyncQuerySchedulerJobRequest request) {
// Implementation to update the job
}

private void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =
OpenSearchDataSourceMetadataStorage.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
createIndexRequest.mapping(
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;
try (ThreadContext.StoredContext ignored =
client.threadPool().getThreadContext().stashContext()) {
createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest);
}
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();
if (createIndexResponse.isAcknowledged()) {
LOG.info("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
throw new RuntimeException(
"Internal server error while creating"
+ SCHEDULER_INDEX_NAME
+ " index:: "
+ e.getMessage());
}
}

private Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}
}
Loading

0 comments on commit 94dfff6

Please sign in to comment.