From 91bd2bcc338e15ae68852ae57684dc0b45e7053d Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 27 Jun 2024 15:51:46 -0700 Subject: [PATCH] [Feature] Flint query scheduler part1 - integrate job scheduler plugin Signed-off-by: Louis Chu --- .../src/main/antlr/SqlBaseLexer.g4 | 1 + .../src/main/antlr/SqlBaseParser.g4 | 20 +- async-query/build.gradle | 2 + .../OpenSearchAsyncQueryScheduler.java | 246 ++++++++++++++++++ .../scheduler/SampleExtensionRestHandler.java | 140 ++++++++++ .../job/OpenSearchRefreshIndexJob.java | 162 ++++++++++++ .../OpenSearchRefreshIndexJobRequest.java | 170 ++++++++++++ .../async-query-scheduler-index-mapping.yml | 36 +++ .../async-query-scheduler-index-settings.yml | 11 + build.gradle | 2 +- common/build.gradle | 4 +- core/build.gradle | 2 +- integ-test/build.gradle | 2 +- legacy/build.gradle | 2 +- plugin/build.gradle | 32 +-- .../org/opensearch/sql/plugin/SQLPlugin.java | 36 ++- ...rch.jobscheduler.spi.JobSchedulerExtension | 6 + ppl/build.gradle | 2 +- protocol/build.gradle | 2 +- sql/build.gradle | 2 +- 20 files changed, 845 insertions(+), 35 deletions(-) create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java create mode 100644 async-query/src/main/resources/async-query-scheduler-index-mapping.yml create mode 100644 async-query/src/main/resources/async-query-scheduler-index-settings.yml create mode 100644 plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension diff --git a/async-query-core/src/main/antlr/SqlBaseLexer.g4 b/async-query-core/src/main/antlr/SqlBaseLexer.g4 index 85a4633e80..bde298c23e 100644 --- a/async-query-core/src/main/antlr/SqlBaseLexer.g4 +++ b/async-query-core/src/main/antlr/SqlBaseLexer.g4 @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND'; NANOSECONDS: 'NANOSECONDS'; NATURAL: 'NATURAL'; NO: 'NO'; +NONE: 'NONE'; NOT: 'NOT'; NULL: 'NULL'; NULLS: 'NULLS'; diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index 54eff14b6d..dc2c2f0794 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -52,7 +52,7 @@ singleCompoundStatement ; beginEndCompoundBlock - : BEGIN compoundBody END + : beginLabel? BEGIN compoundBody END endLabel? ; compoundBody @@ -68,6 +68,14 @@ singleStatement : statement SEMICOLON* EOF ; +beginLabel + : multipartIdentifier COLON + ; + +endLabel + : multipartIdentifier + ; + singleExpression : namedExpression EOF ; @@ -174,6 +182,8 @@ statement | ALTER TABLE identifierReference (partitionSpec)? SET locationSpec #setTableLocation | ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions + | ALTER TABLE identifierReference + (clusterBySpec | CLUSTER BY NONE) #alterClusterBy | DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable | DROP VIEW (IF EXISTS)? identifierReference #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? @@ -853,13 +863,17 @@ identifierComment relationPrimary : identifierReference temporalClause? - sample? tableAlias #tableName + optionsClause? sample? tableAlias #tableName | LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery | LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation | inlineTable #inlineTableDefault2 | functionTable #tableValuedFunction ; +optionsClause + : WITH options=propertyList + ; + inlineTable : VALUES expression (COMMA expression)* tableAlias ; @@ -1572,6 +1586,7 @@ ansiNonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NULLS | NUMERIC | OF @@ -1920,6 +1935,7 @@ nonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NOT | NULL | NULLS diff --git a/async-query/build.gradle b/async-query/build.gradle index 5a4a0d729d..f4907b91c0 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -16,6 +16,8 @@ repositories { dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + api project(':core') api project(':async-query-core') implementation project(':protocol') diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java new file mode 100644 index 0000000000..15f2331de5 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -0,0 +1,246 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class OpenSearchAsyncQueryScheduler { + public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; + public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; + private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = + "async-query-scheduler-index-mapping.yml"; + private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME = + "async-query-scheduler-index-settings.yml"; + private static final Logger LOG = LogManager.getLogger(); + private Client client; + private ClusterService clusterService; + + public OpenSearchAsyncQueryScheduler() { + LOG.info("OpenSearchAsyncQueryScheduler initialized"); + } + + public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) { + this.client = client; + this.clusterService = clusterService; + OpenSearchRefreshIndexJob openSearchRefreshIndexJob = + OpenSearchRefreshIndexJob.getJobRunnerInstance(); + openSearchRefreshIndexJob.setClusterService(clusterService); + openSearchRefreshIndexJob.setThreadPool(threadPool); + openSearchRefreshIndexJob.setClient(client); + } + + public static ScheduledJobRunner getJobRunner() { + return OpenSearchRefreshIndexJob.getJobRunnerInstance(); + } + + public void scheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + createAsyncQuerySchedulerIndex(); + } + IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME); + indexRequest.id(request.getName()); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionFuture indexResponseActionFuture; + IndexResponse indexResponse; + + indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), null)); + indexResponseActionFuture = client.index(indexRequest); + indexResponse = indexResponseActionFuture.actionGet(); + + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.info("Job : {} successfully created", request.getName()); + } else { + throw new RuntimeException( + "Schedule job failed with result : " + indexResponse.getResult().getLowercase()); + } + LOG.info(indexRequest.source()); + } + + public void unscheduleJob(OpenSearchRefreshIndexJobRequest request) throws IOException { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + throw new RuntimeException("Index does not exist."); + } + IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName()); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + builder.field("enabled", false); + builder.endObject(); + indexRequest.source(builder); + } + + ActionFuture indexResponseActionFuture = client.index(indexRequest); + IndexResponse indexResponse = indexResponseActionFuture.actionGet(); + + if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { + LOG.info("Job : {} successfully unscheduled", request.getName()); + } else { + throw new RuntimeException( + "Unschedule job failed with result : " + indexResponse.getResult().getLowercase()); + } + } + + public void removeJob(OpenSearchRefreshIndexJobRequest request) { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + throw new RuntimeException("Index does not exist."); + } + DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, request.getName()); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionFuture deleteResponseActionFuture = client.delete(deleteRequest); + DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet(); + + if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) { + LOG.info("Job : {} successfully deleted", request.getName()); + } else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { + throw new RuntimeException("Job : " + request.getName() + " doesn't exist"); + } else { + throw new RuntimeException( + "Remove job failed with result : " + deleteResponse.getResult().getLowercase()); + } + } + + public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + throw new RuntimeException("Index does not exist."); + } + IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME).id(request.getName()); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + if (request.getSchedule() != null) { + builder.field("schedule", request.getSchedule()); + } + builder.endObject(); + indexRequest.source(builder); + } + + ActionFuture indexResponseActionFuture = client.index(indexRequest); + IndexResponse indexResponse = indexResponseActionFuture.actionGet(); + + if (indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { + LOG.info("Job : {} successfully updated", request.getName()); + } else { + throw new RuntimeException( + "Update job failed with result : " + indexResponse.getResult().getLowercase()); + } + } + + private void createAsyncQuerySchedulerIndex() { + try { + InputStream mappingFileStream = + OpenSearchAsyncQueryScheduler.class + .getClassLoader() + .getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME); + InputStream settingsFileStream = + OpenSearchAsyncQueryScheduler.class + .getClassLoader() + .getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME); + createIndexRequest.mapping( + IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML); + createIndexRequest.settings( + IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML); + ActionFuture createIndexResponseActionFuture = + client.admin().indices().create(createIndexRequest); + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + + if (createIndexResponse.isAcknowledged()) { + LOG.info("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME); + } else { + throw new RuntimeException("Index creation is not acknowledged."); + } + } catch (Throwable e) { + LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e); + throw new RuntimeException( + "Internal server error while creating " + + SCHEDULER_INDEX_NAME + + " index: " + + e.getMessage(), + e); + } + } + + public static ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + OpenSearchRefreshIndexJobRequest.Builder builder = + new OpenSearchRefreshIndexJobRequest.Builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD: + builder.withJobName(parser.text()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD: + builder.withEnabled(parser.booleanValue()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD: + builder.withEnabledTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD: + builder.withLastUpdateTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD: + builder.withSchedule(ScheduleParser.parse(parser)); + break; + case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS: + builder.withLockDurationSeconds(parser.longValue()); + break; + case OpenSearchRefreshIndexJobRequest.JITTER: + builder.withJitter(parser.doubleValue()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return builder.build(); + }; + } + + private static Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java new file mode 100644 index 0000000000..b758c8f321 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java @@ -0,0 +1,140 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.spark.scheduler; + +import static org.opensearch.core.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.core.rest.RestStatus.OK; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; + +/** A sample rest handler that supports schedule and deschedule job operation */ +public class SampleExtensionRestHandler extends BaseRestHandler { + private static final Logger LOG = LogManager.getLogger(SampleExtensionRestHandler.class); + + public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch"; + private final OpenSearchAsyncQueryScheduler scheduler; + + public SampleExtensionRestHandler(OpenSearchAsyncQueryScheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public String getName() { + return "Sample JobScheduler extension handler"; + } + + @Override + public List routes() { + return Collections.unmodifiableList( + Arrays.asList( + new Route(RestRequest.Method.POST, WATCH_INDEX_URI), + new Route(RestRequest.Method.DELETE, WATCH_INDEX_URI))); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) + throws IOException { + switch (request.method()) { + case POST: + return handlePostRequest(request, client); + case DELETE: + return handleDeleteRequest(request, client); + default: + return restChannel -> { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, request.method() + " is not allowed.")); + }; + } + } + + private RestChannelConsumer handlePostRequest(RestRequest request, NodeClient client) { + String jobName = request.param("job_name"); + String interval = request.param("interval"); + + if (jobName == null || interval == null) { + return restChannel -> + restChannel.sendResponse( + new BytesRestResponse( + BAD_REQUEST, "Missing required parameters: id, index, job_name, interval")); + } + + return restChannel -> { + try { + Instant now = Instant.now(); + OpenSearchRefreshIndexJobRequest jobRequest = + new OpenSearchRefreshIndexJobRequest.Builder() + .withJobName(jobName) + .withSchedule( + new IntervalSchedule( + now, + Integer.parseInt(interval), + ChronoUnit + .MINUTES)) // Assuming ScheduleParser can parse the interval directly + .withEnabled(true) + .withEnabledTime(now) + .withLastUpdateTime(now) + .build(); + scheduler.scheduleJob(jobRequest); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject().field("message", "Scheduled job with name " + jobName).endObject(); + restChannel.sendResponse(new BytesRestResponse(OK, builder)); + } catch (Exception e) { + LOG.error("Failed to schedule job", e); + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }; + } + + private RestChannelConsumer handleDeleteRequest(RestRequest request, NodeClient client) { + String jobName = request.param("job_name"); + if (jobName == null) { + return restChannel -> + restChannel.sendResponse( + new BytesRestResponse(BAD_REQUEST, "Must specify jobName parameter")); + } + + return restChannel -> { + try { + Instant now = Instant.now(); + OpenSearchRefreshIndexJobRequest jobRequest = + new OpenSearchRefreshIndexJobRequest.Builder() + .withJobName(jobName) + .withLastUpdateTime(now) + .build(); + scheduler.removeJob(jobRequest); + XContentBuilder builder = JsonXContent.contentBuilder(); + builder + .startObject() + .field("message", "Remove scheduled job with name " + jobName) + .endObject(); + restChannel.sendResponse(new BytesRestResponse(OK, builder)); + } catch (Exception e) { + LOG.error("Failed to schedule job", e); + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }; + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java new file mode 100644 index 0000000000..da5c97f9bf --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java @@ -0,0 +1,162 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import java.util.List; +import java.util.UUID; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +/** + * The job runner class for scheduling refresh index query. + * + *

The job runner should be a singleton class if it uses OpenSearch client or other objects + * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, + * OpenSearch has not invoked plugins' createComponents() method. That is saying the plugin is not + * completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link + * ClusterService} and other objects are not available to plugin and this job runner. + * + *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, + * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler + * plugin. + */ +public class OpenSearchRefreshIndexJob implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class); + + private static OpenSearchRefreshIndexJob INSTANCE; + + public static OpenSearchRefreshIndexJob getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (OpenSearchRefreshIndexJob.class) { + if (INSTANCE == null) { + INSTANCE = new OpenSearchRefreshIndexJob(); + } + return INSTANCE; + } + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private OpenSearchRefreshIndexJob() { + // Singleton class, use getJobRunnerInstance method instead of constructor + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof OpenSearchRefreshIndexJobRequest)) { + throw new IllegalStateException( + "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + + jobParameter.getClass().getCanonicalName()); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = + () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock( + jobParameter, + context, + ActionListener.wrap( + lock -> { + if (lock == null) { + return; + } + + OpenSearchRefreshIndexJobRequest parameter = + (OpenSearchRefreshIndexJobRequest) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ").append(parameter.getName()).append("\n"); + + List shardRoutingList = + this.clusterService.state().routingTable().allShards(parameter.getName()); + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap( + released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, + exception -> { + throw new IllegalStateException( + "Failed to release lock.", exception); + })); + }, + exception -> { + throw new IllegalStateException("Failed to acquire lock.", exception); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(OpenSearchRefreshIndexJobRequest jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getName()) + .id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON), + ActionListener.wrap( + response -> log.info("Indexing task for integration tests completed."), + exception -> log.error("Indexing task for integration tests failed.", exception))); + } + + private void runTaskForLockIntegrationTests(OpenSearchRefreshIndexJobRequest jobParameter) + throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); // Simulate long-running task + } + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java new file mode 100644 index 0000000000..9f5551cbce --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java @@ -0,0 +1,170 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import java.io.IOException; +import java.time.Instant; +import java.util.Objects; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +public class OpenSearchRefreshIndexJobRequest implements ScheduledJobParameter { + public static final String JOB_NAME_FIELD = "jobName"; + public static final String JOB_TYPE_FIELD = "jobType"; + public static final String LAST_UPDATE_TIME_FIELD = "lastUpdateTime"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FIELD = "enabledTime"; + public static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field"; + public static final String LOCK_DURATION_SECONDS = "lockDurationSeconds"; + public static final String JITTER = "jitter"; + public static final String ENABLED_FIELD = "enabled"; + + // name is doc id + private final String jobName; + private final String jobType; + private final Schedule schedule; + private final boolean enabled; + private final Instant lastUpdateTime; + private final Instant enabledTime; + private final Long lockDurationSeconds; + private final Double jitter; + + private OpenSearchRefreshIndexJobRequest(Builder builder) { + this.jobName = Objects.requireNonNull(builder.jobName, "jobName is required"); + this.lastUpdateTime = + Objects.requireNonNull(builder.lastUpdateTime, "lastUpdateTime is required"); + this.jobType = builder.jobType; + this.schedule = builder.schedule; + this.enabled = builder.enabled; + this.lockDurationSeconds = builder.lockDurationSeconds; + this.jitter = builder.jitter; + this.enabledTime = builder.enabledTime; + } + + @Override + public String getName() { + return jobName; + } + + public String getJobType() { + return jobType; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + public static class Builder { + private String jobName; + private String jobType; + private Schedule schedule; + private boolean enabled; + private Long lockDurationSeconds; + private Double jitter; + private Instant enabledTime; + private Instant lastUpdateTime; + + public Builder withJobName(String name) { + this.jobName = name; + return this; + } + + public Builder withJobType(String jobType) { + this.jobType = jobType; + return this; + } + + public Builder withSchedule(Schedule schedule) { + this.schedule = schedule; + return this; + } + + public Builder withEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + public Builder withLockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + return this; + } + + public Builder withJitter(Double jitter) { + this.jitter = jitter; + return this; + } + + public Builder withEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + return this; + } + + public Builder withLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + return this; + } + + public OpenSearchRefreshIndexJobRequest build() { + return new OpenSearchRefreshIndexJobRequest(this); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(); + builder + .field(JOB_NAME_FIELD, getName()) + .field(JOB_TYPE_FIELD, getJobType()) + .field(ENABLED_FIELD, isEnabled()) + .field(SCHEDULE_FIELD, getSchedule()); + if (getEnabledTime() != null) { + builder.timeField( + ENABLED_TIME_FIELD, ENABLED_TIME_FIELD_READABLE, getEnabledTime().toEpochMilli()); + } + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + getLastUpdateTime().toEpochMilli()); + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } +} diff --git a/async-query/src/main/resources/async-query-scheduler-index-mapping.yml b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml new file mode 100644 index 0000000000..949854e0ef --- /dev/null +++ b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml @@ -0,0 +1,36 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the .async-query-scheduler index +# Also "dynamic" is set to "false" so that other fields cannot be added. +dynamic: false +properties: + name: + type: keyword + jobType: + type: keyword + lastUpdateTime: + type: date + format: epoch_millis + enabledTime: + type: date + format: epoch_millis + schedule: + properties: + initialDelay: + type: long + interval: + type: long + timeUnit: + type: keyword + enabled: + type: boolean + lockDurationSeconds: + type: long + null_value: -1 + jitter: + type: double + null_value: 0.0 \ No newline at end of file diff --git a/async-query/src/main/resources/async-query-scheduler-index-settings.yml b/async-query/src/main/resources/async-query-scheduler-index-settings.yml new file mode 100644 index 0000000000..386f1f4f34 --- /dev/null +++ b/async-query/src/main/resources/async-query-scheduler-index-settings.yml @@ -0,0 +1,11 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Settings file for the .async-query-scheduler index +index: + number_of_shards: "1" + auto_expand_replicas: "0-2" + number_of_replicas: "0" \ No newline at end of file diff --git a/build.gradle b/build.gradle index b3e09d7b50..d088883022 100644 --- a/build.gradle +++ b/build.gradle @@ -192,7 +192,7 @@ configurations.all { exclude group: "commons-logging", module: "commons-logging" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' } // updateVersion: Task to auto increment to the next development iteration diff --git a/common/build.gradle b/common/build.gradle index b4ee98a5b7..786629b027 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -34,7 +34,7 @@ repositories { dependencies { api "org.antlr:antlr4-runtime:4.7.1" - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' @@ -46,7 +46,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' - testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + testImplementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation('org.junit.jupiter:junit-jupiter:5.9.3') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0' diff --git a/core/build.gradle b/core/build.gradle index 655e7d92c2..9a2f08f148 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -46,7 +46,7 @@ pitest { } dependencies { - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 93153cf737..22b6d24005 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -153,7 +153,7 @@ configurations.all { resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" diff --git a/legacy/build.gradle b/legacy/build.gradle index 0467db183d..303387bdbe 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -107,7 +107,7 @@ dependencies { because 'https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379' } } - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' diff --git a/plugin/build.gradle b/plugin/build.gradle index 710d81ed0a..08c09ebb7c 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -3,25 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - plugins { id 'java' id "io.freefair.lombok" @@ -48,6 +29,7 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } @@ -76,7 +58,7 @@ publishing { } repositories { maven { - name = "Snapshots" // optional target repository name + name = "Snapshots" url = "https://aws.oss.sonatype.org/content/repositories/snapshots" credentials { username "$System.env.SONATYPE_USERNAME" @@ -98,7 +80,8 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' + resolutionStrategy.force 'com.google.guava:failureaccess:1.0.2' resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" @@ -139,6 +122,10 @@ spotless { } dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + compileOnly 'com.google.guava:guava:32.1.3-jre' + compileOnly 'com.google.guava:failureaccess:1.0.2' + api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" @@ -204,11 +191,10 @@ dependencyLicenses.enabled = false // enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]" testingConventions.enabled = false -// TODO: need to verify the thirdPartyAudi +// TODO: need to verify the thirdPartyAudit // currently it complains missing classes like ibatis, mysql etc, should not be a problem thirdPartyAudit.enabled = false - apply plugin: 'com.netflix.nebula.ospackage' validateNebulaPom.enabled = false diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index b86ab9218a..14df0a201e 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -42,6 +42,9 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; @@ -91,6 +94,8 @@ import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; +import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.SampleExtensionRestHandler; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -105,7 +110,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin { +public class SQLPlugin extends Plugin + implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); @@ -116,6 +122,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Sys private NodeClient client; private DataSourceServiceImpl dataSourceService; + private OpenSearchAsyncQueryScheduler asyncQueryScheduler; private Injector injector; public String name() { @@ -141,6 +148,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( + new SampleExtensionRestHandler(this.asyncQueryScheduler), new RestPPLQueryAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), @@ -208,6 +216,8 @@ public Collection createComponents( this.client = (NodeClient) client; this.dataSourceService = createDataSourceService(); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); + this.asyncQueryScheduler = new OpenSearchAsyncQueryScheduler(); + this.asyncQueryScheduler.loadJobResource(client, clusterService, threadPool); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); LocalClusterState.state().setClient(client); @@ -243,6 +253,30 @@ public Collection createComponents( pluginSettings); } + @Override + public String getJobType() { + LOGGER.info("Louis: getJobType"); + return OpenSearchAsyncQueryScheduler.SCHEDULER_PLUGIN_JOB_TYPE; + } + + @Override + public String getJobIndex() { + LOGGER.info("Louis: getJobIndex"); + return OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + LOGGER.info("Louis: getJobRunner"); + return OpenSearchAsyncQueryScheduler.getJobRunner(); + } + + @Override + public ScheduledJobParser getJobParser() { + LOGGER.info("Louis: getJobParser"); + return OpenSearchAsyncQueryScheduler.getJobParser(); + } + @Override public List> getExecutorBuilders(Settings settings) { return singletonList( diff --git a/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000000..5337857c15 --- /dev/null +++ b/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +org.opensearch.sql.plugin.SQLPlugin \ No newline at end of file diff --git a/ppl/build.gradle b/ppl/build.gradle index d58882d5e8..39bed7a359 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -48,7 +48,7 @@ dependencies { runtimeOnly group: 'org.reflections', name: 'reflections', version: '0.9.12' implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.json', name: 'json', version: '20231013' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api project(':common') diff --git a/protocol/build.gradle b/protocol/build.gradle index 5bbff68e51..d706b480c3 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -30,7 +30,7 @@ plugins { } dependencies { - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" diff --git a/sql/build.gradle b/sql/build.gradle index 81872e6035..52ae0ac621 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -46,7 +46,7 @@ dependencies { antlr "org.antlr:antlr4:4.7.1" implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation project(':common') implementation project(':core')