diff --git a/.gitignore b/.gitignore index 1b892036dd..b9775dea04 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,5 @@ gen .worktrees http-client.env.json /doctest/sql-cli/ +/doctest/opensearch-job-scheduler/ .factorypath diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index a50051715e..c7aa56cf92 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -66,8 +66,8 @@ compoundStatement ; setStatementWithOptionalVarKeyword - : SET (VARIABLE | VAR)? assignmentList #setVariableWithOptionalKeyword - | SET (VARIABLE | VAR)? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ + : SET variable? assignmentList #setVariableWithOptionalKeyword + | SET variable? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword ; @@ -215,9 +215,9 @@ statement routineCharacteristics RETURN (query | expression) #createUserDefinedFunction | DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction - | DECLARE (OR REPLACE)? VARIABLE? + | DECLARE (OR REPLACE)? variable? identifierReference dataType? variableDefaultExpression? #createVariable - | DROP TEMPORARY VARIABLE (IF EXISTS)? identifierReference #dropVariable + | DROP TEMPORARY variable (IF EXISTS)? identifierReference #dropVariable | EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)? (statement|setResetStatement) #explain | SHOW TABLES ((FROM | IN) identifierReference)? @@ -272,8 +272,8 @@ setResetStatement | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone - | SET (VARIABLE | VAR) assignmentList #setVariable - | SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ + | SET variable assignmentList #setVariable + | SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ LEFT_PAREN query RIGHT_PAREN #setVariable | SET configKey EQ configValue #setQuotedConfiguration | SET configKey (EQ .*?)? #setConfiguration @@ -438,6 +438,11 @@ namespaces | SCHEMAS ; +variable + : VARIABLE + | VAR + ; + describeFuncName : identifierReference | stringLit diff --git a/async-query/build.gradle b/async-query/build.gradle index 5a4a0d729d..abda6161d3 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -16,6 +16,8 @@ repositories { dependencies { + implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + api project(':core') api project(':async-query-core') implementation project(':protocol') @@ -97,6 +99,7 @@ jacocoTestCoverageVerification { // ignore because XContext IOException 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.rest.*', + 'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser', 'org.opensearch.sql.spark.transport.model.*' ] limit { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java new file mode 100644 index 0000000000..c7a66fc6be --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -0,0 +1,197 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +/** Scheduler class for managing asynchronous query jobs. */ +public class OpenSearchAsyncQueryScheduler { + public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; + public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; + private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = + "async-query-scheduler-index-mapping.yml"; + private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME = + "async-query-scheduler-index-settings.yml"; + private static final Logger LOG = LogManager.getLogger(); + + private Client client; + private ClusterService clusterService; + + /** Loads job resources, setting up required services and job runner instance. */ + public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) { + this.client = client; + this.clusterService = clusterService; + OpenSearchRefreshIndexJob openSearchRefreshIndexJob = + OpenSearchRefreshIndexJob.getJobRunnerInstance(); + openSearchRefreshIndexJob.setClusterService(clusterService); + openSearchRefreshIndexJob.setThreadPool(threadPool); + openSearchRefreshIndexJob.setClient(client); + } + + /** Schedules a new job by indexing it into the job index. */ + public void scheduleJob(OpenSearchRefreshIndexJobRequest request) { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + createAsyncQuerySchedulerIndex(); + } + IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME); + indexRequest.id(request.getName()); + indexRequest.opType(DocWriteRequest.OpType.CREATE); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + IndexResponse indexResponse; + try { + indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS)); + ActionFuture indexResponseActionFuture = client.index(indexRequest); + indexResponse = indexResponseActionFuture.actionGet(); + } catch (VersionConflictEngineException exception) { + throw new IllegalArgumentException("A job already exists with name: " + request.getName()); + } catch (Throwable e) { + LOG.error("Failed to schedule job : {}", request.getName(), e); + throw new RuntimeException(e); + } + + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.debug("Job : {} successfully created", request.getName()); + } else { + throw new RuntimeException( + "Schedule job failed with result : " + indexResponse.getResult().getLowercase()); + } + } + + /** Unschedules a job by marking it as disabled and updating its last update time. */ + public void unscheduleJob(String jobId) throws IOException { + assertIndexExists(); + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(jobId) + .enabled(false) + .lastUpdateTime(Instant.now()) + .build(); + updateJob(request); + } + + /** Updates an existing job with new parameters. */ + public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException { + assertIndexExists(); + UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName()); + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS)); + UpdateResponse updateResponse; + try { + ActionFuture updateResponseActionFuture = client.update(updateRequest); + updateResponse = updateResponseActionFuture.actionGet(); + } catch (DocumentMissingException exception) { + throw new IllegalArgumentException("Job: " + request.getName() + " doesn't exist"); + } catch (Throwable e) { + LOG.error("Failed to update job : {}", request.getName(), e); + throw new RuntimeException(e); + } + + if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED) + || updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) { + LOG.debug("Job : {} successfully updated", request.getName()); + } else { + throw new RuntimeException( + "Update job failed with result : " + updateResponse.getResult().getLowercase()); + } + } + + /** Removes a job by deleting its document from the index. */ + public void removeJob(String jobId) { + assertIndexExists(); + DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId); + deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ActionFuture deleteResponseActionFuture = client.delete(deleteRequest); + DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet(); + + if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) { + LOG.debug("Job : {} successfully deleted", jobId); + } else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { + throw new IllegalArgumentException("Job : " + jobId + " doesn't exist"); + } else { + throw new RuntimeException( + "Remove job failed with result : " + deleteResponse.getResult().getLowercase()); + } + } + + /** Creates the async query scheduler index with specified mappings and settings. */ + @VisibleForTesting + void createAsyncQuerySchedulerIndex() { + try { + InputStream mappingFileStream = + OpenSearchAsyncQueryScheduler.class + .getClassLoader() + .getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME); + InputStream settingsFileStream = + OpenSearchAsyncQueryScheduler.class + .getClassLoader() + .getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME); + createIndexRequest.mapping( + IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML); + createIndexRequest.settings( + IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML); + ActionFuture createIndexResponseActionFuture = + client.admin().indices().create(createIndexRequest); + CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet(); + + if (createIndexResponse.isAcknowledged()) { + LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME); + } else { + throw new RuntimeException("Index creation is not acknowledged."); + } + } catch (Throwable e) { + LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e); + throw new RuntimeException( + "Internal server error while creating " + + SCHEDULER_INDEX_NAME + + " index: " + + e.getMessage(), + e); + } + } + + private void assertIndexExists() { + if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { + throw new IllegalStateException("Job index does not exist."); + } + } + + /** Returns the job runner instance for the scheduler. */ + public static ScheduledJobRunner getJobRunner() { + return OpenSearchRefreshIndexJob.getJobRunnerInstance(); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java new file mode 100644 index 0000000000..0422e7c015 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java @@ -0,0 +1,71 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import java.io.IOException; +import java.time.Instant; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; + +public class OpenSearchRefreshIndexJobRequestParser { + + private static Instant parseInstantValue(XContentParser parser) throws IOException { + if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { + return null; + } + if (parser.currentToken().isValue()) { + return Instant.ofEpochMilli(parser.longValue()); + } + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + return null; + } + + public static ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> { + OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder = + OpenSearchRefreshIndexJobRequest.builder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD: + builder.jobName(parser.text()); + break; + case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD: + builder.jobType(parser.text()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD: + builder.enabled(parser.booleanValue()); + break; + case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD: + builder.enabledTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD: + builder.lastUpdateTime(parseInstantValue(parser)); + break; + case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD: + builder.schedule(ScheduleParser.parse(parser)); + break; + case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS: + builder.lockDurationSeconds(parser.longValue()); + break; + case OpenSearchRefreshIndexJobRequest.JITTER: + builder.jitter(parser.doubleValue()); + break; + default: + XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); + } + } + return builder.build(); + }; + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java new file mode 100644 index 0000000000..e465a8790f --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java @@ -0,0 +1,93 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.plugins.Plugin; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +/** + * The job runner class for scheduling refresh index query. + * + *

The job runner should be a singleton class if it uses OpenSearch client or other objects + * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, + * OpenSearch has not invoked plugins' createComponents() method. That is saying the plugin is not + * completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link + * ClusterService} and other objects are not available to plugin and this job runner. + * + *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, + * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler + * plugin. + */ +public class OpenSearchRefreshIndexJob implements ScheduledJobRunner { + + private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class); + + public static OpenSearchRefreshIndexJob INSTANCE = new OpenSearchRefreshIndexJob(); + + public static OpenSearchRefreshIndexJob getJobRunnerInstance() { + return INSTANCE; + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + + private OpenSearchRefreshIndexJob() { + // Singleton class, use getJobRunnerInstance method instead of constructor + } + + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + } + + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + public void setClient(Client client) { + this.client = client; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + if (!(jobParameter instanceof OpenSearchRefreshIndexJobRequest)) { + throw new IllegalStateException( + "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + + jobParameter.getClass().getCanonicalName()); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + if (this.client == null) { + throw new IllegalStateException("Client is not initialized."); + } + + Runnable runnable = + () -> { + doRefresh(jobParameter.getName()); + }; + threadPool.generic().submit(runnable); + } + + void doRefresh(String refreshIndex) { + // TODO: add logic to refresh index + log.info("Scheduled refresh index job on : " + refreshIndex); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java new file mode 100644 index 0000000000..7eaa4e2d29 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import java.io.IOException; +import java.time.Instant; +import lombok.Builder; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +/** Represents a job request to refresh index. */ +@Builder +public class OpenSearchRefreshIndexJobRequest implements ScheduledJobParameter { + // Constant fields for JSON serialization + public static final String JOB_NAME_FIELD = "jobName"; + public static final String JOB_TYPE_FIELD = "jobType"; + public static final String LAST_UPDATE_TIME_FIELD = "lastUpdateTime"; + public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FIELD = "enabledTime"; + public static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field"; + public static final String LOCK_DURATION_SECONDS = "lockDurationSeconds"; + public static final String JITTER = "jitter"; + public static final String ENABLED_FIELD = "enabled"; + + // name is doc id + private final String jobName; + private final String jobType; + private final Schedule schedule; + private final boolean enabled; + private final Instant lastUpdateTime; + private final Instant enabledTime; + private final Long lockDurationSeconds; + private final Double jitter; + + @Override + public String getName() { + return jobName; + } + + public String getJobType() { + return jobType; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public Instant getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return enabledTime; + } + + @Override + public Long getLockDurationSeconds() { + return lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(); + builder.field(JOB_NAME_FIELD, getName()).field(ENABLED_FIELD, isEnabled()); + if (getSchedule() != null) { + builder.field(SCHEDULE_FIELD, getSchedule()); + } + if (getJobType() != null) { + builder.field(JOB_TYPE_FIELD, getJobType()); + } + if (getEnabledTime() != null) { + builder.timeField( + ENABLED_TIME_FIELD, ENABLED_TIME_FIELD_READABLE, getEnabledTime().toEpochMilli()); + } + builder.timeField( + LAST_UPDATE_TIME_FIELD, + LAST_UPDATE_TIME_FIELD_READABLE, + getLastUpdateTime().toEpochMilli()); + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } +} diff --git a/async-query/src/main/resources/async-query-scheduler-index-mapping.yml b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml new file mode 100644 index 0000000000..36bd1b873e --- /dev/null +++ b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml @@ -0,0 +1,41 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Schema file for the .async-query-scheduler index +# Also "dynamic" is set to "false" so that other fields cannot be added. +dynamic: false +properties: + name: + type: keyword + jobType: + type: keyword + lastUpdateTime: + type: date + format: epoch_millis + enabledTime: + type: date + format: epoch_millis + schedule: + properties: + initialDelay: + type: long + interval: + properties: + start_time: + type: date + format: "strict_date_time||epoch_millis" + period: + type: integer + unit: + type: keyword + enabled: + type: boolean + lockDurationSeconds: + type: long + null_value: -1 + jitter: + type: double + null_value: 0.0 \ No newline at end of file diff --git a/async-query/src/main/resources/async-query-scheduler-index-settings.yml b/async-query/src/main/resources/async-query-scheduler-index-settings.yml new file mode 100644 index 0000000000..386f1f4f34 --- /dev/null +++ b/async-query/src/main/resources/async-query-scheduler-index-settings.yml @@ -0,0 +1,11 @@ +--- +## +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +## + +# Settings file for the .async-query-scheduler index +index: + number_of_shards: "1" + auto_expand_replicas: "0-2" + number_of_replicas: "0" \ No newline at end of file diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java new file mode 100644 index 0000000000..de86f111f3 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -0,0 +1,434 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; + +import java.io.IOException; +import java.time.Instant; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class OpenSearchAsyncQuerySchedulerTest { + + private static final String TEST_SCHEDULER_INDEX_NAME = "testQS"; + + private static final String TEST_JOB_ID = "testJob"; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ClusterService clusterService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ThreadPool threadPool; + + @Mock private ActionFuture indexResponseActionFuture; + + @Mock private ActionFuture updateResponseActionFuture; + + @Mock private ActionFuture deleteResponseActionFuture; + + @Mock private ActionFuture createIndexResponseActionFuture; + + @Mock private IndexResponse indexResponse; + + @Mock private UpdateResponse updateResponse; + + private OpenSearchAsyncQueryScheduler scheduler; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + scheduler = new OpenSearchAsyncQueryScheduler(); + scheduler.loadJobResource(client, clusterService, threadPool); + } + + @Test + public void testScheduleJob() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) + .thenReturn(Boolean.FALSE); + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME)); + when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); + + // Test the if case + when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); + when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); + + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + scheduler.scheduleJob(request); + + // Verify index created + verify(client.admin().indices(), times(1)).create(ArgumentMatchers.any()); + + // Verify doc indexed + ArgumentCaptor captor = ArgumentCaptor.forClass(IndexRequest.class); + verify(client, times(1)).index(captor.capture()); + IndexRequest capturedRequest = captor.getValue(); + assertEquals(request.getName(), capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testScheduleJobWithExistingJob() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) + .thenReturn(Boolean.TRUE); + + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(client.index(any(IndexRequest.class))).thenThrow(VersionConflictEngineException.class); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.scheduleJob(request); + }); + + verify(client, times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture()); + assertEquals("A job already exists with name: testJob", exception.getMessage()); + } + + @Test + public void testScheduleJobWithExceptions() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) + .thenReturn(Boolean.FALSE); + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME)); + when(client.index(any(IndexRequest.class))).thenThrow(new RuntimeException("Test exception")); + + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + + when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture); + when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); + when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + assertEquals("Schedule job failed with result : not_found", exception.getMessage()); + } + + @Test + public void testUnscheduleJob() throws IOException { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + + scheduler.unscheduleJob(TEST_JOB_ID); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).update(captor.capture()); + + UpdateRequest capturedRequest = captor.getValue(); + assertEquals(TEST_JOB_ID, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + + // Reset the captor for the next verification + captor = ArgumentCaptor.forClass(UpdateRequest.class); + + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); + scheduler.unscheduleJob(TEST_JOB_ID); + + verify(client, times(2)).update(captor.capture()); + capturedRequest = captor.getValue(); + assertEquals(TEST_JOB_ID, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testUnscheduleJobWithIndexNotFound() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); + + assertThrows(IllegalStateException.class, () -> scheduler.unscheduleJob(TEST_JOB_ID)); + } + + @Test + public void testUpdateJob() throws IOException { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + + scheduler.updateJob(request); + + ArgumentCaptor captor = ArgumentCaptor.forClass(UpdateRequest.class); + verify(client).update(captor.capture()); + + UpdateRequest capturedRequest = captor.getValue(); + assertEquals(request.getName(), capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testUpdateJobWithIndexNotFound() { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); + + assertThrows(IllegalStateException.class, () -> scheduler.updateJob(request)); + } + + @Test + public void testUpdateJobWithExceptions() { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + when(client.update(any(UpdateRequest.class))) + .thenThrow(new DocumentMissingException(null, null)); + + IllegalArgumentException exception1 = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.updateJob(request); + }); + + assertEquals("Job: testJob doesn't exist", exception1.getMessage()); + + when(client.update(any(UpdateRequest.class))).thenThrow(new RuntimeException("Test exception")); + + RuntimeException exception2 = + assertThrows( + RuntimeException.class, + () -> { + scheduler.updateJob(request); + }); + + assertEquals("java.lang.RuntimeException: Test exception", exception2.getMessage()); + + when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture); + when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse); + when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); + + RuntimeException exception = + assertThrows(RuntimeException.class, () -> scheduler.updateJob(request)); + assertEquals("Update job failed with result : not_found", exception.getMessage()); + } + + @Test + public void testRemoveJob() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED); + + when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + + scheduler.removeJob(TEST_JOB_ID); + + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteRequest.class); + verify(client).delete(captor.capture()); + + DeleteRequest capturedRequest = captor.getValue(); + assertEquals(TEST_JOB_ID, capturedRequest.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy()); + } + + @Test + public void testRemoveJobWithIndexNotFound() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); + + assertThrows(IllegalStateException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + } + + @Test + public void testCreateAsyncQuerySchedulerIndex() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); + + CreateIndexResponse createIndexResponse = mock(CreateIndexResponse.class); + when(createIndexResponseActionFuture.actionGet()).thenReturn(createIndexResponse); + when(createIndexResponse.isAcknowledged()).thenReturn(true); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + + scheduler.createAsyncQuerySchedulerIndex(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(CreateIndexRequest.class); + verify(client.admin().indices()).create(captor.capture()); + + CreateIndexRequest capturedRequest = captor.getValue(); + assertEquals(SCHEDULER_INDEX_NAME, capturedRequest.index()); + } + + @Test + public void testCreateAsyncQuerySchedulerIndexFailure() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenThrow(new RuntimeException("Error creating index")); + + RuntimeException exception = + assertThrows( + RuntimeException.class, + () -> { + scheduler.createAsyncQuerySchedulerIndex(); + }); + + assertEquals( + "Internal server error while creating .async-query-scheduler index: Error creating index", + exception.getMessage()); + + when(client.admin().indices().create(any(CreateIndexRequest.class))) + .thenReturn(createIndexResponseActionFuture); + Mockito.when(createIndexResponseActionFuture.actionGet()) + .thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME)); + + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + RuntimeException runtimeException = + Assertions.assertThrows(RuntimeException.class, () -> scheduler.scheduleJob(request)); + Assertions.assertEquals( + "Internal server error while creating .async-query-scheduler index: Index creation is not" + + " acknowledged.", + runtimeException.getMessage()); + } + + @Test + public void testUpdateJobNotFound() { + OpenSearchRefreshIndexJobRequest request = + OpenSearchRefreshIndexJobRequest.builder() + .jobName(TEST_JOB_ID) + .lastUpdateTime(Instant.now()) + .build(); + + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + when(client.update(any(UpdateRequest.class))) + .thenThrow(new DocumentMissingException(null, null)); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.updateJob(request); + }); + + assertEquals("Job: testJob doesn't exist", exception.getMessage()); + } + + @Test + public void testRemoveJobNotFound() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOT_FOUND); + + when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + scheduler.removeJob(TEST_JOB_ID); + }); + + assertEquals("Job : testJob doesn't exist", exception.getMessage()); + } + + @Test + public void testRemoveJobWithExceptions() { + when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true); + + when(client.delete(any(DeleteRequest.class))).thenThrow(new RuntimeException("Test exception")); + + assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + + DeleteResponse deleteResponse = mock(DeleteResponse.class); + when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture); + when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse); + when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.NOOP); + + RuntimeException runtimeException = + Assertions.assertThrows(RuntimeException.class, () -> scheduler.removeJob(TEST_JOB_ID)); + Assertions.assertEquals("Remove job failed with result : noop", runtimeException.getMessage()); + } + + @Test + public void testGetJobRunner() { + ScheduledJobRunner jobRunner = OpenSearchAsyncQueryScheduler.getJobRunner(); + assertNotNull(jobRunner); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java new file mode 100644 index 0000000000..cbf137997e --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java @@ -0,0 +1,145 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.time.Instant; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class OpenSearchRefreshIndexJobTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ClusterService clusterService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ThreadPool threadPool; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + + @Mock private JobExecutionContext context; + + private OpenSearchRefreshIndexJob jobRunner; + + private OpenSearchRefreshIndexJob spyJobRunner; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + jobRunner.setClient(null); + jobRunner.setClusterService(null); + jobRunner.setThreadPool(null); + } + + @Test + public void testRunJobWithCorrectParameter() { + spyJobRunner = spy(jobRunner); + spyJobRunner.setClusterService(clusterService); + spyJobRunner.setThreadPool(threadPool); + spyJobRunner.setClient(client); + + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .build(); + + spyJobRunner.runJob(jobParameter, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.generic()).submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(spyJobRunner).doRefresh(eq(jobParameter.getName())); + } + + @Test + public void testRunJobWithIncorrectParameter() { + jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + jobRunner.setClusterService(clusterService); + jobRunner.setThreadPool(threadPool); + jobRunner.setClient(client); + + ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); + + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(wrongParameter, context), + "Expected IllegalStateException but no exception was thrown"); + + assertEquals( + "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " + + wrongParameter.getClass().getCanonicalName(), + exception.getMessage()); + } + + @Test + public void testRunJobWithUninitializedServices() { + OpenSearchRefreshIndexJobRequest jobParameter = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("ClusterService is not initialized.", exception.getMessage()); + + jobRunner.setClusterService(clusterService); + + exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("ThreadPool is not initialized.", exception.getMessage()); + + jobRunner.setThreadPool(threadPool); + + exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("Client is not initialized.", exception.getMessage()); + } + + @Test + public void testGetJobRunnerInstanceMultipleCalls() { + OpenSearchRefreshIndexJob instance1 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + OpenSearchRefreshIndexJob instance2 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + OpenSearchRefreshIndexJob instance3 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); + + assertSame(instance1, instance2); + assertSame(instance2, instance3); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java new file mode 100644 index 0000000000..108f1acfd5 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; + +public class OpenSearchRefreshIndexJobRequestTest { + + @Test + public void testBuilderAndGetterMethods() { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + OpenSearchRefreshIndexJobRequest jobRequest = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .jobType("testType") + .schedule(schedule) + .enabled(true) + .lastUpdateTime(now) + .enabledTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + assertEquals("testJob", jobRequest.getName()); + assertEquals("testType", jobRequest.getJobType()); + assertEquals(schedule, jobRequest.getSchedule()); + assertTrue(jobRequest.isEnabled()); + assertEquals(now, jobRequest.getLastUpdateTime()); + assertEquals(now, jobRequest.getEnabledTime()); + assertEquals(60L, jobRequest.getLockDurationSeconds()); + assertEquals(0.1, jobRequest.getJitter()); + } + + @Test + public void testToXContent() throws IOException { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + OpenSearchRefreshIndexJobRequest jobRequest = + OpenSearchRefreshIndexJobRequest.builder() + .jobName("testJob") + .jobType("testType") + .schedule(schedule) + .enabled(true) + .lastUpdateTime(now) + .enabledTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + jobRequest.toXContent(builder, EMPTY_PARAMS); + String jsonString = builder.toString(); + + assertTrue(jsonString.contains("\"jobName\" : \"testJob\"")); + assertTrue(jsonString.contains("\"jobType\" : \"testType\"")); + assertTrue(jsonString.contains("\"start_time\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"period\" : 1")); + assertTrue(jsonString.contains("\"unit\" : \"Minutes\"")); + assertTrue(jsonString.contains("\"enabled\" : true")); + assertTrue(jsonString.contains("\"lastUpdateTime\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"enabledTime\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"lockDurationSeconds\" : 60")); + assertTrue(jsonString.contains("\"jitter\" : 0.1")); + } +} diff --git a/build.gradle b/build.gradle index b3e09d7b50..702d6f478a 100644 --- a/build.gradle +++ b/build.gradle @@ -50,6 +50,7 @@ buildscript { return "https://github.com/prometheus/prometheus/releases/download/v${prometheus_binary_version}/prometheus-${prometheus_binary_version}."+ getOSFamilyType() + "-" + getArchType() + ".tar.gz" } aws_java_sdk_version = "1.12.651" + guava_version = "32.1.3-jre" } repositories { @@ -192,7 +193,7 @@ configurations.all { exclude group: "commons-logging", module: "commons-logging" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force "com.google.guava:guava:${guava_version}" } // updateVersion: Task to auto increment to the next development iteration diff --git a/common/build.gradle b/common/build.gradle index b4ee98a5b7..15c48dd6b3 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -34,7 +34,7 @@ repositories { dependencies { api "org.antlr:antlr4-runtime:4.7.1" - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: "${guava_version}" api group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' @@ -46,7 +46,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.13.2' testImplementation group: 'org.assertj', name: 'assertj-core', version: '3.9.1' - testImplementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + testImplementation group: 'com.google.guava', name: 'guava', version: "${guava_version}" testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' testImplementation('org.junit.jupiter:junit-jupiter:5.9.3') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.7.0' diff --git a/core/build.gradle b/core/build.gradle index 655e7d92c2..f36777030c 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -46,7 +46,7 @@ pitest { } dependencies { - api group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + api group: 'com.google.guava', name: 'guava', version: "${guava_version}" api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' api group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' api group: 'com.facebook.presto', name: 'presto-matching', version: '0.240' diff --git a/doctest/build.gradle b/doctest/build.gradle index ec5a26b52b..a125a4f336 100644 --- a/doctest/build.gradle +++ b/doctest/build.gradle @@ -5,6 +5,8 @@ import org.opensearch.gradle.testclusters.RunTask +import java.util.concurrent.Callable + plugins { id 'base' id 'com.wiredforcode.spawn' @@ -109,6 +111,10 @@ if (version_tokens.length > 1) { String mlCommonsRemoteFile = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/opensearch/plugins/opensearch-ml-' + opensearch_build + '.zip' String mlCommonsPlugin = 'opensearch-ml' +String bwcOpenSearchJSDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + opensearch_no_snapshot + '/latest/linux/x64/tar/builds/' + + 'opensearch/plugins/opensearch-job-scheduler-' + opensearch_build + '.zip' +String jsPlugin = 'opensearch-job-scheduler' + testClusters { docTestCluster { // Disable loading of `ML-commons` plugin, because it might be unavailable (not released yet). @@ -133,6 +139,7 @@ testClusters { } })) */ + plugin(getJobSchedulerPlugin(jsPlugin, bwcOpenSearchJSDownload)) plugin ':opensearch-sql-plugin' testDistribution = 'archive' } @@ -159,3 +166,49 @@ spotless { googleJavaFormat('1.17.0').reflowLongStrings().groupArtifact('com.google.googlejavaformat:google-java-format') } } + +def getJobSchedulerPlugin(String jsPlugin, String bwcOpenSearchJSDownload) { + return provider(new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + // Use absolute paths + String basePath = new File('.').getCanonicalPath() + File dir = new File(basePath + File.separator + 'doctest' + File.separator + jsPlugin) + + // Log the directory path for debugging + println("Creating directory: " + dir.getAbsolutePath()) + + // Create directory if it doesn't exist + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new IOException("Failed to create directory: " + dir.getAbsolutePath()) + } + } + + // Define the file path + File f = new File(dir, jsPlugin + '-' + opensearch_build + '.zip') + + // Download file if it doesn't exist + if (!f.exists()) { + println("Downloading file from: " + bwcOpenSearchJSDownload) + println("Saving to file: " + f.getAbsolutePath()) + + new URL(bwcOpenSearchJSDownload).withInputStream { ins -> + f.withOutputStream { it << ins } + } + } + + // Check if the file was created successfully + if (!f.exists()) { + throw new FileNotFoundException("File was not created: " + f.getAbsolutePath()) + } + + return fileTree(f.getParent()).matching { include f.getName() }.singleFile + } + } + } + }) +} diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 93153cf737..1acacdb4a5 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -80,7 +80,6 @@ ext { var projectAbsPath = projectDir.getAbsolutePath() File downloadedSecurityPlugin = Paths.get(projectAbsPath, 'bin', 'opensearch-security-snapshot.zip').toFile() - configureSecurityPlugin = { OpenSearchCluster cluster -> cluster.getNodes().forEach { node -> @@ -138,6 +137,10 @@ ext { cluster.plugin provider((Callable) (() -> (RegularFile) (() -> downloadedSecurityPlugin))) } + + bwcOpenSearchJSDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + baseVersion + '/latest/linux/x64/tar/builds/' + + 'opensearch/plugins/opensearch-job-scheduler-' + bwcVersion + '.zip' + bwcJobSchedulerPath = bwcFilePath + "job-scheduler/" } tasks.withType(licenseHeaders.class) { @@ -153,7 +156,6 @@ configurations.all { resolutionStrategy.force "commons-logging:commons-logging:1.2" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" @@ -166,6 +168,7 @@ configurations.all { resolutionStrategy.force "joda-time:joda-time:2.10.12" resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36" resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:${aws_java_sdk_version}" + resolutionStrategy.force "com.google.guava:guava:${guava_version}" } configurations { @@ -191,6 +194,7 @@ dependencies { testCompileOnly 'org.apiguardian:apiguardian-api:1.1.2' // Needed for BWC tests + zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" zipArchive group: 'org.opensearch.plugin', name:'opensearch-sql-plugin', version: "${bwcVersion}-SNAPSHOT" } @@ -219,22 +223,42 @@ testClusters.all { } } +def getJobSchedulerPlugin() { + provider(new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/opensearch-job-scheduler*' + }.singleFile + } + } + } + }) +} + testClusters { integTest { testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" setting "plugins.query.datasources.encryption.masterkey", "1234567812345678" } remoteCluster { testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" } integTestWithSecurity { testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" } remoteIntegTestWithSecurity { testDistribution = 'archive' + plugin(getJobSchedulerPlugin()) plugin ":opensearch-sql-plugin" } } @@ -502,6 +526,24 @@ task comparisonTest(type: RestIntegTestTask) { testDistribution = "ARCHIVE" versions = [baseVersion, opensearch_version] numberOfNodes = 3 + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion").exists()) { + project.delete(files("$project.rootDir/$bwcFilePath/job-scheduler/$bwcVersion")) + } + project.mkdir bwcJobSchedulerPath + bwcVersion + ant.get(src: bwcOpenSearchJSDownload, + dest: bwcJobSchedulerPath + bwcVersion, + httpusecaches: false) + return fileTree(bwcJobSchedulerPath + bwcVersion).getSingleFile() + } + } + } + })) plugin(provider(new Callable(){ @Override RegularFile call() throws Exception { @@ -522,17 +564,18 @@ task comparisonTest(type: RestIntegTestTask) { } List> plugins = [ - provider(new Callable() { - @Override - RegularFile call() throws Exception { - return new RegularFile() { - @Override - File getAsFile() { - return fileTree(bwcFilePath + project.version).getSingleFile() + getJobSchedulerPlugin(), + provider(new Callable() { + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return fileTree(bwcFilePath + project.version).getSingleFile() + } } } - } - }) + }) ] // Creates 2 test clusters with 3 nodes of the old version. diff --git a/legacy/build.gradle b/legacy/build.gradle index 0467db183d..e3ddf27066 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -107,7 +107,7 @@ dependencies { because 'https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379' } } - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: "${guava_version}" implementation group: 'org.json', name: 'json', version:'20231013' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.apache.commons', name: 'commons-text', version: '1.10.0' diff --git a/plugin/build.gradle b/plugin/build.gradle index 710d81ed0a..7ebd0ad2d9 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -48,6 +48,7 @@ opensearchplugin { name 'opensearch-sql' description 'OpenSearch SQL' classname 'org.opensearch.sql.plugin.SQLPlugin' + extendedPlugins = ['opensearch-job-scheduler'] licenseFile rootProject.file("LICENSE.txt") noticeFile rootProject.file("NOTICE") } @@ -98,7 +99,8 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" // enforce 1.1.3, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379 resolutionStrategy.force 'commons-codec:commons-codec:1.13' - resolutionStrategy.force 'com.google.guava:guava:32.0.1-jre' + resolutionStrategy.force "com.google.guava:guava:${guava_version}" + resolutionStrategy.force 'com.google.guava:failureaccess:1.0.2' resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-smile:${versions.jackson}" resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}" @@ -139,6 +141,10 @@ spotless { } dependencies { + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" + compileOnly "com.google.guava:guava:${guava_version}" + compileOnly 'com.google.guava:failureaccess:1.0.2' + api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}" api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}" api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}" @@ -204,11 +210,10 @@ dependencyLicenses.enabled = false // enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]" testingConventions.enabled = false -// TODO: need to verify the thirdPartyAudi +// TODO: need to verify the thirdPartyAudit // currently it complains missing classes like ibatis, mysql etc, should not be a problem thirdPartyAudit.enabled = false - apply plugin: 'com.netflix.nebula.ospackage' validateNebulaPom.enabled = false diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index b86ab9218a..a1b1e32955 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -42,6 +42,9 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.indices.SystemIndexDescriptor; +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; @@ -91,6 +94,9 @@ import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; +import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser; +import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -105,7 +111,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, SystemIndexPlugin { +public class SQLPlugin extends Plugin + implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); @@ -116,6 +123,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin, Sys private NodeClient client; private DataSourceServiceImpl dataSourceService; + private OpenSearchAsyncQueryScheduler asyncQueryScheduler; private Injector injector; public String name() { @@ -208,6 +216,8 @@ public Collection createComponents( this.client = (NodeClient) client; this.dataSourceService = createDataSourceService(); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); + this.asyncQueryScheduler = new OpenSearchAsyncQueryScheduler(); + this.asyncQueryScheduler.loadJobResource(client, clusterService, threadPool); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); LocalClusterState.state().setClient(client); @@ -243,6 +253,26 @@ public Collection createComponents( pluginSettings); } + @Override + public String getJobType() { + return OpenSearchAsyncQueryScheduler.SCHEDULER_PLUGIN_JOB_TYPE; + } + + @Override + public String getJobIndex() { + return OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return OpenSearchRefreshIndexJob.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return OpenSearchRefreshIndexJobRequestParser.getJobParser(); + } + @Override public List> getExecutorBuilders(Settings settings) { return singletonList( diff --git a/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 0000000000..5337857c15 --- /dev/null +++ b/plugin/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,6 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +org.opensearch.sql.plugin.SQLPlugin \ No newline at end of file diff --git a/ppl/build.gradle b/ppl/build.gradle index d58882d5e8..2a3d6bdbf9 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -48,7 +48,7 @@ dependencies { runtimeOnly group: 'org.reflections', name: 'reflections', version: '0.9.12' implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: "${guava_version}" api group: 'org.json', name: 'json', version: '20231013' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:"${versions.log4j}" api project(':common') diff --git a/protocol/build.gradle b/protocol/build.gradle index 5bbff68e51..b5d7929041 100644 --- a/protocol/build.gradle +++ b/protocol/build.gradle @@ -30,7 +30,7 @@ plugins { } dependencies { - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: "${guava_version}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" diff --git a/sql/build.gradle b/sql/build.gradle index 81872e6035..10bb4b24bb 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -46,7 +46,7 @@ dependencies { antlr "org.antlr:antlr4:4.7.1" implementation "org.antlr:antlr4-runtime:4.7.1" - implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' + implementation group: 'com.google.guava', name: 'guava', version: "${guava_version}" implementation group: 'org.json', name: 'json', version:'20231013' implementation project(':common') implementation project(':core')