diff --git a/async-query-core/src/main/antlr/SqlBaseLexer.g4 b/async-query-core/src/main/antlr/SqlBaseLexer.g4 index 85a4633e80..bde298c23e 100644 --- a/async-query-core/src/main/antlr/SqlBaseLexer.g4 +++ b/async-query-core/src/main/antlr/SqlBaseLexer.g4 @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND'; NANOSECONDS: 'NANOSECONDS'; NATURAL: 'NATURAL'; NO: 'NO'; +NONE: 'NONE'; NOT: 'NOT'; NULL: 'NULL'; NULLS: 'NULLS'; diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index 54eff14b6d..dc2c2f0794 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -52,7 +52,7 @@ singleCompoundStatement ; beginEndCompoundBlock - : BEGIN compoundBody END + : beginLabel? BEGIN compoundBody END endLabel? ; compoundBody @@ -68,6 +68,14 @@ singleStatement : statement SEMICOLON* EOF ; +beginLabel + : multipartIdentifier COLON + ; + +endLabel + : multipartIdentifier + ; + singleExpression : namedExpression EOF ; @@ -174,6 +182,8 @@ statement | ALTER TABLE identifierReference (partitionSpec)? SET locationSpec #setTableLocation | ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions + | ALTER TABLE identifierReference + (clusterBySpec | CLUSTER BY NONE) #alterClusterBy | DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable | DROP VIEW (IF EXISTS)? identifierReference #dropView | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)? @@ -853,13 +863,17 @@ identifierComment relationPrimary : identifierReference temporalClause? - sample? tableAlias #tableName + optionsClause? sample? tableAlias #tableName | LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery | LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation | inlineTable #inlineTableDefault2 | functionTable #tableValuedFunction ; +optionsClause + : WITH options=propertyList + ; + inlineTable : VALUES expression (COMMA expression)* tableAlias ; @@ -1572,6 +1586,7 @@ ansiNonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NULLS | NUMERIC | OF @@ -1920,6 +1935,7 @@ nonReserved | NANOSECOND | NANOSECONDS | NO + | NONE | NOT | NULL | NULLS diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java new file mode 100644 index 0000000000..d7dc8337a0 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +public interface AsyncQueryScheduler { + void scheduleJob(AsyncQuerySchedulerJobRequest request); + + void unscheduleJob(AsyncQuerySchedulerJobRequest request); + + void removeJob(AsyncQuerySchedulerJobRequest request); + + void updateJob(AsyncQuerySchedulerJobRequest request); +} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJob.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJob.java new file mode 100644 index 0000000000..b8af4c0901 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJob.java @@ -0,0 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +public interface AsyncQuerySchedulerJob {} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJobRequest.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJobRequest.java new file mode 100644 index 0000000000..694f379a59 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQuerySchedulerJobRequest.java @@ -0,0 +1,67 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +public abstract class AsyncQuerySchedulerJobRequest { + private final String jobName; + private final String jobType; + private final String schedule; + private final boolean enabled; + + protected AsyncQuerySchedulerJobRequest(Builder builder) { + this.jobName = builder.jobName; + this.jobType = builder.jobType; + this.schedule = builder.schedule; + this.enabled = builder.enabled; + } + + public String getJobName() { + return jobName; + } + + public String getJobType() { + return jobType; + } + + public String getRawSchedule() { + return schedule; + } + + public boolean isEnabled() { + return enabled; + } + + public abstract static class Builder> { + private String jobName; + private String jobType; + private String schedule; + private boolean enabled; + + public T withJobName(String jobName) { + this.jobName = jobName; + return self(); + } + + public T withJobType(String jobType) { + this.jobType = jobType; + return self(); + } + + public T withSchedule(String schedule) { + this.schedule = schedule; + return self(); + } + + public T withEnabled(boolean enabled) { + this.enabled = enabled; + return self(); + } + + protected abstract T self(); + + public abstract AsyncQuerySchedulerJobRequest build(); + } +} diff --git a/async-query/build.gradle b/async-query/build.gradle index 5a4a0d729d..f4907b91c0 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -16,6 +16,8 @@ repositories { dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + api project(':core') api project(':async-query-core') implementation project(':protocol') diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java new file mode 100644 index 0000000000..a34b0aca9d --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; +import org.opensearch.sql.datasources.storage.OpenSearchDataSourceMetadataStorage; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler { + public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; + public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; + private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = + "async-query-scheduler-index-mapping.yml"; + private static final Logger LOG = LogManager.getLogger(); + + private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; + + /** + * This class implements DataSourceMetadataStorage interface using OpenSearch as underlying + * storage. + * + * @param client opensearch NodeClient. + * @param clusterService ClusterService. + */ + public OpenSearchAsyncQueryScheduler( + Client client, ClusterService clusterService, ThreadPool threadPool) { + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + } + + public ScheduledJobRunner getJobRunner() { + OpenSearchRefreshIndexJob openSearchRefreshIndexJob = + OpenSearchRefreshIndexJob.getJobRunnerInstance(); + openSearchRefreshIndexJob.setClusterService(clusterService); + openSearchRefreshIndexJob.setThreadPool(threadPool); + openSearchRefreshIndexJob.setClient(client); + return openSearchRefreshIndexJob; + } + + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + OpenSearchRefreshIndexJobRequest.Builder builder = + new OpenSearchRefreshIndexJobRequest.Builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case OpenSearchRefreshIndexJobRequest.NAME_FIELD: + builder.withJobName(parser.text()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_FILED: + builder.withEnabled(parser.booleanValue()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FILED: + builder.withEnabledTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD: + builder.withLastUpdateTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD: + builder.withSchedule(ScheduleParser.parse(parser).toString()); + break; + case OpenSearchRefreshIndexJobRequest.INDEX_NAME_FIELD: + builder.withIndexToWatch(parser.text()); + break; + case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS: + builder.withLockDurationSeconds(parser.longValue()); + break; + case OpenSearchRefreshIndexJobRequest.JITTER: + builder.withJitter(parser.doubleValue()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return builder.build(); + }; + } + + @Override + public void scheduleJob(AsyncQuerySchedulerJobRequest request) { + // Implementation to schedule the job + } + + @Override + public void unscheduleJob(AsyncQuerySchedulerJobRequest request) { + // Implementation to unschedule the job + } + + @Override + public void removeJob(AsyncQuerySchedulerJobRequest request) { + // Implementation to remove the job + } + + @Override + public void updateJob(AsyncQuerySchedulerJobRequest request) { + // Implementation to update the job + } + + private void createAsyncQuerySchedulerIndex() { + try { + InputStream mappingFileStream = + OpenSearchDataSourceMetadataStorage.class + .getClassLoader() + .getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME); + createIndexRequest.mapping( + IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML); + ActionFuture createIndexResponseActionFuture; + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + createIndexResponseActionFuture = client.admin().indices().create(createIndexRequest); + } + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + if (createIndexResponse.isAcknowledged()) { + LOG.info("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME); + } else { + throw new RuntimeException("Index creation is not acknowledged."); + } + } catch (Throwable e) { + throw new RuntimeException( + "Internal server error while creating" + + SCHEDULER_INDEX_NAME + + " index:: " + + e.getMessage()); + } + } + + private Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java new file mode 100644 index 0000000000..439de74987 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/SampleExtensionRestHandler.java @@ -0,0 +1,165 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.sql.spark.scheduler; + +import java.io.IOException; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJobRequest; + +/** + * A sample rest handler that supports schedule and deschedule job operation + * + *

Users need to provide "id", "index", "job_name", and "interval" parameter to schedule a job. + * e.g. {@code POST /_plugins/scheduler_sample/watch?id=dashboards-job-id&job_name=watch dashboards + * index&index=.opensearch_dashboards_1&interval=1 } + * + *

creates a job with id "dashboards-job-id" and job name "watch dashboards index", which logs + * ".opensearch_dashboards_1" index's shards info every 1 minute + * + *

Users can remove that job by calling {@code DELETE + * /_plugins/scheduler_sample/watch?id=dashboards-job-id} + */ +public class SampleExtensionRestHandler extends BaseRestHandler { + public static final String WATCH_INDEX_URI = "/_plugins/scheduler_sample/watch"; + + @Override + public String getName() { + return "Sample JobScheduler extension handler"; + } + + @Override + public List routes() { + return Collections.unmodifiableList( + Arrays.asList( + new Route(RestRequest.Method.POST, WATCH_INDEX_URI), + new Route(RestRequest.Method.DELETE, WATCH_INDEX_URI))); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) + throws IOException { + if (request.method().equals(RestRequest.Method.POST)) { + // compose OpenSearchRefreshIndexJobRequest object from request + String id = request.param("id"); + String indexName = request.param("index"); + String jobName = request.param("job_name"); + String interval = request.param("interval"); + String lockDurationSecondsString = request.param("lock_duration_seconds"); + Long lockDurationSeconds = + lockDurationSecondsString != null ? Long.parseLong(lockDurationSecondsString) : null; + String jitterString = request.param("jitter"); + Double jitter = jitterString != null ? Double.parseDouble(jitterString) : null; + + if (id == null || indexName == null || jobName == null || interval == null) { + throw new IllegalArgumentException( + "Must specify id, index, job_name, and interval parameters"); + } + + OpenSearchRefreshIndexJobRequest.Builder builder = + new OpenSearchRefreshIndexJobRequest.Builder() + .withJobName(jobName) + .withIndexToWatch(indexName) + .withSchedule(String.format("PT%sM", interval)) + .withEnabled(true) + .withEnabledTime(Instant.now()) + .withLastUpdateTime(Instant.now()); + + if (lockDurationSeconds != null) { + builder.withLockDurationSeconds(lockDurationSeconds); + } + + if (jitter != null) { + builder.withJitter(jitter); + } + + OpenSearchRefreshIndexJobRequest jobParameter = builder.build(); + + IndexRequest indexRequest = + new IndexRequest() + .index(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME) + .id(id) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + return restChannel -> { + // index the job parameter + client.index( + indexRequest, + new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + try { + RestResponse restResponse = + new BytesRestResponse( + RestStatus.OK, + indexResponse.toXContent(JsonXContent.contentBuilder(), null)); + restChannel.sendResponse(restResponse); + } catch (IOException e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else if (request.method().equals(RestRequest.Method.DELETE)) { + // delete job parameter doc from index + String id = request.param("id"); + if (id == null) { + throw new IllegalArgumentException("Must specify id parameter"); + } + DeleteRequest deleteRequest = + new DeleteRequest().index(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME).id(id); + + return restChannel -> { + client.delete( + deleteRequest, + new ActionListener() { + @Override + public void onResponse(DeleteResponse deleteResponse) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.OK, "Job deleted.")); + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + }); + }; + } else { + return restChannel -> { + restChannel.sendResponse( + new BytesRestResponse( + RestStatus.METHOD_NOT_ALLOWED, request.method() + " is not allowed.")); + }; + } + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java new file mode 100644 index 0000000000..4fa254d687 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java @@ -0,0 +1,167 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import java.util.List; +import java.util.UUID; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.plugins.Plugin; +import org.opensearch.sql.spark.scheduler.AsyncQuerySchedulerJob; +import org.opensearch.threadpool.ThreadPool; + +/** + * A sample job runner class. + * + *

The job runner should be a singleton class if it uses OpenSearch client or other objects + * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, + * OpenSearch has not invoke plugins' createComponents() method. That is saying the plugin is not + * completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link + * ClusterService} and other objects are not available to plugin and this job runner. + * + *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, + * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler + * plugin. + * + *

This sample job runner takes the "indexToWatch" from job parameter and logs that index's + * shards. + */ +public class OpenSearchRefreshIndexJob implements ScheduledJobRunner, AsyncQuerySchedulerJob { + + private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); + + private static OpenSearchRefreshIndexJob INSTANCE; + + public static OpenSearchRefreshIndexJob getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (OpenSearchRefreshIndexJob.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new OpenSearchRefreshIndexJob(); + return INSTANCE; + } + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private OpenSearchRefreshIndexJob() { + // Singleton class, use getJobRunner method instead of constructor + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof OpenSearchRefreshIndexJobRequest)) { + throw new IllegalStateException( + "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + + jobParameter.getClass().getCanonicalName()); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + final LockService lockService = context.getLockService(); + + Runnable runnable = + () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock( + jobParameter, + context, + ActionListener.wrap( + lock -> { + if (lock == null) { + return; + } + + OpenSearchRefreshIndexJobRequest parameter = + (OpenSearchRefreshIndexJobRequest) jobParameter; + StringBuilder msg = new StringBuilder(); + msg.append("Watching index ") + .append(parameter.getIndexToWatch()) + .append("\n"); + + List shardRoutingList = + this.clusterService + .state() + .routingTable() + .allShards(parameter.getIndexToWatch()); + for (ShardRouting shardRouting : shardRoutingList) { + msg.append(shardRouting.shardId().getId()) + .append("\t") + .append(shardRouting.currentNodeId()) + .append("\t") + .append(shardRouting.active() ? "active" : "inactive") + .append("\n"); + } + log.info(msg.toString()); + runTaskForIntegrationTests(parameter); + runTaskForLockIntegrationTests(parameter); + + lockService.release( + lock, + ActionListener.wrap( + released -> { + log.info("Released lock for job {}", jobParameter.getName()); + }, + exception -> { + throw new IllegalStateException("Failed to release lock."); + })); + }, + exception -> { + throw new IllegalStateException("Failed to acquire lock."); + })); + } + }; + + threadPool.generic().submit(runnable); + } + + private void runTaskForIntegrationTests(OpenSearchRefreshIndexJobRequest jobParameter) { + this.client.index( + new IndexRequest(jobParameter.getIndexToWatch()) + .id(UUID.randomUUID().toString()) + .source("{\"message\": \"message\"}", XContentType.JSON)); + } + + private void runTaskForLockIntegrationTests(OpenSearchRefreshIndexJobRequest jobParameter) + throws InterruptedException { + if (jobParameter.getName().equals("sample-job-lock-test-it")) { + Thread.sleep(180000); + } + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobRequest.java new file mode 100644 index 0000000000..a735587a24 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobRequest.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.sql.spark.scheduler.AsyncQuerySchedulerJobRequest; + +/** + * A sample job parameter. + * + *

It adds an additional "indexToWatch" field to {@link ScheduledJobParameter}, which stores the + * index the job runner will watch. + */ +public class OpenSearchRefreshIndexJobRequest extends AsyncQuerySchedulerJobRequest + implements ScheduledJobParameter { + public static final String NAME_FIELD = "name"; + public static final String ENABLED_FILED = "enabled"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FILED = "enabled_time"; + public static final String ENABLED_TIME_FILED_READABLE = "enabled_time_field"; + public static final String INDEX_NAME_FIELD = "index_name_to_watch"; + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + public static final String JITTER = "jitter"; + + private Instant lastUpdateTime; + private Instant enabledTime; + private String indexToWatch; + private Long lockDurationSeconds; + private Double jitter; + + private OpenSearchRefreshIndexJobRequest(Builder builder) { + super(builder); + this.indexToWatch = builder.indexToWatch; + this.lockDurationSeconds = builder.lockDurationSeconds; + this.jitter = builder.jitter; + this.enabledTime = builder.enabledTime; + this.lastUpdateTime = builder.lastUpdatedTime; + } + + public String getIndexToWatch() { + return this.indexToWatch; + } + + @Override + public String getName() { + return super.getJobName(); + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Schedule getSchedule() { + // TODO: Optimize parser + return new IntervalSchedule( + Instant.now(), Integer.parseInt(this.getRawSchedule()), ChronoUnit.MINUTES); + } + + @Override + public Long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + public static class Builder extends AsyncQuerySchedulerJobRequest.Builder { + private String indexToWatch; + private Long lockDurationSeconds; + private Double jitter; + + private Instant enabledTime; + + private Instant lastUpdatedTime; + + public Builder withIndexToWatch(String indexToWatch) { + this.indexToWatch = indexToWatch; + return this; + } + + public Builder withLockDurationSeconds(Long lockDurationSeconds) { + this.lockDurationSeconds = lockDurationSeconds; + return this; + } + + public Builder withJitter(Double jitter) { + this.jitter = jitter; + return this; + } + + public Builder withEnabledTime(Instant enabledTime) { + this.enabledTime = enabledTime; + return this; + } + + public Builder withLastUpdateTime(Instant lastUpdateTime) { + this.lastUpdatedTime = lastUpdatedTime; + return this; + } + + @Override + protected Builder self() { + return this; + } + + @Override + public OpenSearchRefreshIndexJobRequest build() { + return new OpenSearchRefreshIndexJobRequest(this); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(); + builder + .field(NAME_FIELD, getJobName()) + .field(ENABLED_FILED, isEnabled()) + .field(SCHEDULE_FIELD, getRawSchedule()) + .field(INDEX_NAME_FIELD, this.indexToWatch); + if (getEnabledTime() != null) { + builder.timeField( + ENABLED_TIME_FILED, ENABLED_TIME_FILED_READABLE, getEnabledTime().toEpochMilli()); + } + if (getLastUpdateTime() != null) { + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + getLastUpdateTime().toEpochMilli()); + } + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } + + @Override + public String getJobType() { + return "OpenSearchRefreshIndexJob"; + } + + @Override + public boolean isFragment() { + return false; + } +} diff --git a/async-query/src/main/java/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/async-query/src/main/java/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000000..5337857c15 --- /dev/null +++ b/async-query/src/main/java/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +org.opensearch.sql.plugin.SQLPlugin \ No newline at end of file diff --git a/async-query/src/main/java/resources/async-query-scheduler-index-mapping.yml b/async-query/src/main/java/resources/async-query-scheduler-index-mapping.yml new file mode 100644 index 0000000000..63b4a00549 --- /dev/null +++ b/async-query/src/main/java/resources/async-query-scheduler-index-mapping.yml @@ -0,0 +1,37 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the .async-query-scheduler index +# Also "dynamic" is set to "false" so that other fields can be added. +mappings: + dynamic: true + properties: + name: + type: keyword + jobType: + type: keyword + lastUpdateTime: + type: date + format: epoch_millis + enabledTime: + type: date + format: epoch_millis + schedule: + properties: + initialDelay: + type: long + interval: + type: long + timeUnit: + type: keyword + enabled: + type: boolean + lockDurationSeconds: + type: long + null_value: -1 + jitter: + type: double + null_value: 0.0 \ No newline at end of file diff --git a/build.gradle b/build.gradle index b3e09d7b50..d088883022 100644 --- a/build.gradle +++ b/build.gradle @@ -192,7 +192,7 @@ configurations.all { exclude group: "commons-logging", module: "commons-logging" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' } // updateVersion: Task to auto increment to the next development iteration diff --git a/common/build.gradle b/common/build.gradle index b4ee98a5b7..786629b027 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -34,7 +34,7 @@ repositories { dependencies { api "org.antlr:antlr4-runtime:4.7.1" - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' @@ -46,7 +46,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' - testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + testImplementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation('org.junit.jupiter:junit-jupiter:5.9.3') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0' diff --git a/core/build.gradle b/core/build.gradle index 655e7d92c2..9a2f08f148 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -46,7 +46,7 @@ pitest { } dependencies { - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 93153cf737..22b6d24005 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -153,7 +153,7 @@ configurations.all { resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" diff --git a/legacy/build.gradle b/legacy/build.gradle index 0467db183d..303387bdbe 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -107,7 +107,7 @@ dependencies { because 'https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379' } } - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' diff --git a/plugin/build.gradle b/plugin/build.gradle index 710d81ed0a..08c09ebb7c 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -3,25 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - plugins { id 'java' id "io.freefair.lombok" @@ -48,6 +29,7 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } @@ -76,7 +58,7 @@ publishing { } repositories { maven { - name = "Snapshots" // optional target repository name + name = "Snapshots" url = "https://aws.oss.sonatype.org/content/repositories/snapshots" credentials { username "$System.env.SONATYPE_USERNAME" @@ -98,7 +80,8 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force 'com.google.guava:guava:32.1.3-jre' + resolutionStrategy.force 'com.google.guava:failureaccess:1.0.2' resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" @@ -139,6 +122,10 @@ spotless { } dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + compileOnly 'com.google.guava:guava:32.1.3-jre' + compileOnly 'com.google.guava:failureaccess:1.0.2' + api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" @@ -204,11 +191,10 @@ dependencyLicenses.enabled = false // enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]" testingConventions.enabled = false -// TODO: need to verify the thirdPartyAudi +// TODO: need to verify the thirdPartyAudit // currently it complains missing classes like ibatis, mysql etc, should not be a problem thirdPartyAudit.enabled = false - apply plugin: 'com.netflix.nebula.ospackage' validateNebulaPom.enabled = false diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index b86ab9218a..1673d6838c 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -42,6 +42,9 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; @@ -91,6 +94,8 @@ import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; +import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.SampleExtensionRestHandler; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -105,7 +110,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin { +public class SQLPlugin extends Plugin + implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); @@ -116,6 +122,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Sys private NodeClient client; private DataSourceServiceImpl dataSourceService; + private OpenSearchAsyncQueryScheduler asyncQueryScheduler; private Injector injector; public String name() { @@ -141,6 +148,7 @@ public List getRestHandlers( Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( + new SampleExtensionRestHandler(), new RestPPLQueryAction(), new RestSqlAction(settings, injector), new RestSqlStatsAction(settings, restController), @@ -208,6 +216,7 @@ public Collection createComponents( this.client = (NodeClient) client; this.dataSourceService = createDataSourceService(); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); + this.asyncQueryScheduler = createAsyncQueryScheduler(threadPool); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); LocalClusterState.state().setClient(client); @@ -243,6 +252,30 @@ public Collection createComponents( pluginSettings); } + @Override + public String getJobType() { + LOGGER.info("Louis: getJobType"); + return asyncQueryScheduler.SCHEDULER_PLUGIN_JOB_TYPE; + } + + @Override + public String getJobIndex() { + LOGGER.info("Louis: getJobIndex"); + return asyncQueryScheduler.SCHEDULER_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + LOGGER.info("Louis: getJobRunner"); + return asyncQueryScheduler.getJobRunner(); + } + + @Override + public ScheduledJobParser getJobParser() { + LOGGER.info("Louis: getJobParser"); + return asyncQueryScheduler.getJobParser(); + } + @Override public List> getExecutorBuilders(Settings settings) { return singletonList( @@ -301,6 +334,11 @@ private DataSourceServiceImpl createDataSourceService() { dataSourceUserAuthorizationHelper); } + private OpenSearchAsyncQueryScheduler createAsyncQueryScheduler(ThreadPool threadPool) { + LOGGER.info("Louis: createAsyncQueryScheduler"); + return new OpenSearchAsyncQueryScheduler(client, clusterService, threadPool); + } + @Override public Collection getSystemIndexDescriptors(Settings settings) { List systemIndexDescriptors = new ArrayList<>(); diff --git a/ppl/build.gradle b/ppl/build.gradle index d58882d5e8..39bed7a359 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -48,7 +48,7 @@ dependencies { runtimeOnly group: 'org.reflections', name: 'reflections', version: '0.9.12' implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' api group: 'org.json', name: 'json', version: '20231013' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api project(':common') diff --git a/protocol/build.gradle b/protocol/build.gradle index 5bbff68e51..d706b480c3 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -30,7 +30,7 @@ plugins { } dependencies { - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" diff --git a/sql/build.gradle b/sql/build.gradle index 81872e6035..52ae0ac621 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -46,7 +46,7 @@ dependencies { antlr "org.antlr:antlr4:4.7.1" implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre' implementation group: 'org.json', name: 'json', version:'20231013' implementation project(':common') implementation project(':core')