From 6b24d2f6cfdebcade12276add60430d58bdb2ddf Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 2 Sep 2024 20:10:52 -0700 Subject: [PATCH] Add UT --- .../AsyncQueryExecutorServiceImpl.java | 5 -- .../spark/dispatcher/BatchQueryHandler.java | 7 -- .../spark/dispatcher/QueryHandlerFactory.java | 3 - .../spark/dispatcher/RefreshQueryHandler.java | 5 -- .../dispatcher/StreamingQueryHandler.java | 77 +++++++------------ .../asyncquery/AsyncQueryCoreIntegTest.java | 5 +- .../dispatcher/SparkQueryDispatcherTest.java | 1 - .../operation/FlintIndexOpFactoryTest.java | 1 + .../operation/FlintIndexOpVacuumTest.java | 13 +++- async-query/build.gradle | 2 +- ...utionEngineConfigClusterSettingLoader.java | 32 ++++---- .../OpenSearchAsyncQueryScheduler.java | 4 +- ...ryJob.java => ScheduledAsyncQueryJob.java} | 12 +-- .../config/AsyncExecutorServiceModule.java | 2 - .../AsyncQueryExecutorServiceSpec.java | 9 ++- ...t.java => ScheduledAsyncQueryJobTest.java} | 16 ++-- .../org/opensearch/sql/plugin/SQLPlugin.java | 6 +- 17 files changed, 87 insertions(+), 113 deletions(-) rename async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/{AsyncQueryScheduledQueryJob.java => ScheduledAsyncQueryJob.java} (91%) rename async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/{AsyncQueryScheduledQueryJobTest.java => ScheduledAsyncQueryJobTest.java} (91%) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index 60f13f9fc4..5933343ba4 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -13,8 +13,6 @@ import java.util.List; import java.util.Optional; import lombok.AllArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.json.JSONObject; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException; @@ -37,13 +35,10 @@ public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService private SparkQueryDispatcher sparkQueryDispatcher; private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; - private static final Logger LOGGER = LogManager.getLogger(AsyncQueryExecutorServiceImpl.class); - @Override public CreateAsyncQueryResponse createAsyncQuery( CreateAsyncQueryRequest createAsyncQueryRequest, AsyncQueryRequestContext asyncQueryRequestContext) { - LOGGER.info("CreateAsyncQueryRequest: " + createAsyncQueryRequest.getQuery()); SparkExecutionEngineConfig sparkExecutionEngineConfig = sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(asyncQueryRequestContext); DispatchQueryResponse dispatchQueryResponse = diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index b3f8657ddb..bce1918631 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -13,8 +13,6 @@ import com.amazonaws.services.emrserverless.model.GetJobRunResult; import java.util.Map; import lombok.RequiredArgsConstructor; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.json.JSONObject; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; @@ -42,8 +40,6 @@ public class BatchQueryHandler extends AsyncQueryHandler { protected final MetricsService metricsService; protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider; - private static final Logger LOGGER = LogManager.getLogger(BatchQueryHandler.class); - @Override protected JSONObject getResponseFromResultIndex( AsyncQueryJobMetadata asyncQueryJobMetadata, @@ -106,9 +102,6 @@ public DispatchQueryResponse submit( tags, false, dataSourceMetadata.getResultIndex()); - - LOGGER.info("Submit batch query: " + dispatchQueryRequest.getQuery()); - String jobId = emrServerlessClient.startJobRun(startJobRequest); metricsService.incrementNumericalMetric(EMR_BATCH_QUERY_JOBS_CREATION_COUNT); return DispatchQueryResponse.builder() diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java index efa85ee907..d6e70a9d86 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java @@ -15,7 +15,6 @@ import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @RequiredArgsConstructor public class QueryHandlerFactory { @@ -28,7 +27,6 @@ public class QueryHandlerFactory { private final FlintIndexOpFactory flintIndexOpFactory; private final EMRServerlessClientFactory emrServerlessClientFactory; private final MetricsService metricsService; - private final AsyncQueryScheduler asyncQueryScheduler; protected final SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider; public RefreshQueryHandler getRefreshQueryHandler(String accountId) { @@ -48,7 +46,6 @@ public StreamingQueryHandler getStreamingQueryHandler(String accountId) { jobExecutionResponseReader, leaseManager, metricsService, - asyncQueryScheduler, sparkSubmitParametersBuilderProvider); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index 0c6a6509a9..38145a143e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -6,8 +6,6 @@ package org.opensearch.sql.spark.dispatcher; import java.util.Map; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; @@ -31,7 +29,6 @@ * index, and new job is submitted to Spark. */ public class RefreshQueryHandler extends BatchQueryHandler { - private static final Logger LOGGER = LogManager.getLogger(RefreshQueryHandler.class); private final FlintIndexMetadataService flintIndexMetadataService; private final FlintIndexOpFactory flintIndexOpFactory; @@ -78,8 +75,6 @@ public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); - LOGGER.info("Submit refresh query: " + dispatchQueryRequest.getQuery()); - DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); return DispatchQueryResponse.builder() diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 48da332fa5..80d4be27cf 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -9,12 +9,12 @@ import static org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher.JOB_TYPE_TAG_KEY; import static org.opensearch.sql.spark.metrics.EmrMetrics.EMR_STREAMING_QUERY_JOBS_CREATION_COUNT; -import java.time.Instant; import java.util.Map; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; @@ -25,23 +25,18 @@ import org.opensearch.sql.spark.metrics.MetricsService; import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; -import org.opensearch.sql.spark.rest.model.LangType; -import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; -import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; /** * The handler for streaming query. Streaming query is a job to continuously update flint index. * Once started, the job can be stopped by IndexDML query. */ public class StreamingQueryHandler extends BatchQueryHandler { - private final AsyncQueryScheduler asyncQueryScheduler; public StreamingQueryHandler( EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, LeaseManager leaseManager, MetricsService metricsService, - AsyncQueryScheduler asyncQueryScheduler, SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) { super( emrServerlessClient, @@ -49,7 +44,6 @@ public StreamingQueryHandler( leaseManager, metricsService, sparkSubmitParametersBuilderProvider); - this.asyncQueryScheduler = asyncQueryScheduler; } @Override @@ -67,57 +61,42 @@ public DispatchQueryResponse submit( leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource())); - // String clusterName = dispatchQueryRequest.getClusterName(); + String clusterName = dispatchQueryRequest.getClusterName(); IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails(); Map tags = context.getTags(); tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName()); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); tags.put(JOB_TYPE_TAG_KEY, JobType.STREAMING.getText()); - // String jobName = - // clusterName - // + ":" - // + JobType.STREAMING.getText() - // + ":" - // + indexQueryDetails.openSearchIndexName(); - // StartJobRequest startJobRequest = - // new StartJobRequest( - // jobName, - // dispatchQueryRequest.getAccountId(), - // dispatchQueryRequest.getApplicationId(), - // dispatchQueryRequest.getExecutionRoleARN(), - // sparkSubmitParametersBuilderProvider - // .getSparkSubmitParametersBuilder() - // .clusterName(clusterName) - // .query(dispatchQueryRequest.getQuery()) - // .structuredStreaming(true) - // .dataSource( - // dataSourceMetadata, dispatchQueryRequest, - // context.getAsyncQueryRequestContext()) - // .acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()) - // .acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()) - // .toString(), - // tags, - // indexQueryDetails.getFlintIndexOptions().autoRefresh(), - // dataSourceMetadata.getResultIndex()); - // String jobId = emrServerlessClient.startJobRun(startJobRequest); - AsyncQuerySchedulerRequest request = - new AsyncQuerySchedulerRequest( + String jobName = + clusterName + + ":" + + JobType.STREAMING.getText() + + ":" + + indexQueryDetails.openSearchIndexName(); + StartJobRequest startJobRequest = + new StartJobRequest( + jobName, dispatchQueryRequest.getAccountId(), - "flint_myglue_test_default_count_by_status", - dataSourceMetadata.getName(), - "REFRESH MATERIALIZED VIEW myglue_test.default.count_by_status", - LangType.SQL, - "1 minute", - true, - Instant.now(), - Instant.now(), - -1L, - 0.0); - asyncQueryScheduler.scheduleJob(request); + dispatchQueryRequest.getApplicationId(), + dispatchQueryRequest.getExecutionRoleARN(), + sparkSubmitParametersBuilderProvider + .getSparkSubmitParametersBuilder() + .clusterName(clusterName) + .query(dispatchQueryRequest.getQuery()) + .structuredStreaming(true) + .dataSource( + dataSourceMetadata, dispatchQueryRequest, context.getAsyncQueryRequestContext()) + .acceptModifier(dispatchQueryRequest.getSparkSubmitParameterModifier()) + .acceptComposers(dispatchQueryRequest, context.getAsyncQueryRequestContext()) + .toString(), + tags, + indexQueryDetails.getFlintIndexOptions().autoRefresh(), + dataSourceMetadata.getResultIndex()); + String jobId = emrServerlessClient.startJobRun(startJobRequest); metricsService.incrementNumericalMetric(EMR_STREAMING_QUERY_JOBS_CREATION_COUNT); return DispatchQueryResponse.builder() .queryId(context.getQueryId()) - .jobId("test") + .jobId(jobId) .resultIndex(dataSourceMetadata.getResultIndex()) .datasourceName(dataSourceMetadata.getName()) .jobType(JobType.STREAMING) diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 839487243b..f2ff4bafec 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -161,7 +161,8 @@ public void setUp() { flintIndexStateModelService, flintIndexClient, flintIndexMetadataService, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); QueryHandlerFactory queryHandlerFactory = new QueryHandlerFactory( jobExecutionResponseReader, @@ -172,7 +173,6 @@ public void setUp() { flintIndexOpFactory, emrServerlessClientFactory, metricsService, - asyncQueryScheduler, new SparkSubmitParametersBuilderProvider(collection)); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( @@ -519,6 +519,7 @@ private void givenFlintIndexMetadataExists(String indexName) { .appId(APPLICATION_ID) .jobId(JOB_ID) .opensearchIndexName(indexName) + .flintIndexOptions(new FlintIndexOptions()) .build())); } diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 798f0929f8..29652e6c6d 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -155,7 +155,6 @@ void setUp() { flintIndexOpFactory, emrServerlessClientFactory, metricsService, - asyncQueryScheduler, sparkSubmitParametersBuilderProvider); sparkQueryDispatcher = new SparkQueryDispatcher( diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java index 6e159c9adf..62ac98f1a2 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java @@ -17,6 +17,7 @@ import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @ExtendWith(MockitoExtension.class) class FlintIndexOpFactoryTest { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java index bb8e650783..e73913aa7c 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java @@ -18,11 +18,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @ExtendWith(MockitoExtension.class) class FlintIndexOpVacuumTest { @@ -31,9 +33,16 @@ class FlintIndexOpVacuumTest { public static final String LATEST_ID = "LATEST_ID"; public static final String INDEX_NAME = "INDEX_NAME"; public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID = - FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build(); + FlintIndexMetadata.builder() + .latestId(LATEST_ID) + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID = - FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build(); + FlintIndexMetadata.builder() + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); @Mock FlintIndexClient flintIndexClient; @Mock FlintIndexStateModelService flintIndexStateModelService; @Mock EMRServerlessClientFactory emrServerlessClientFactory; diff --git a/async-query/build.gradle b/async-query/build.gradle index abda6161d3..e435814348 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -99,7 +99,7 @@ jacocoTestCoverageVerification { // ignore because XContext IOException 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.rest.*', - 'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser', + 'org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser', 'org.opensearch.sql.spark.transport.model.*' ] limit { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingLoader.java b/async-query/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingLoader.java index db495bc212..73b057ca5c 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingLoader.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingLoader.java @@ -5,8 +5,13 @@ package org.opensearch.sql.spark.config; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG; + +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.Optional; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.common.setting.Settings; /** Load SparkExecutionEngineConfigClusterSetting from settings with privilege check. */ @@ -15,20 +20,17 @@ public class SparkExecutionEngineConfigClusterSettingLoader { private final Settings settings; public Optional load() { - // String sparkExecutionEngineConfigSettingString = - // this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG); - SparkExecutionEngineConfigClusterSetting setting = - new SparkExecutionEngineConfigClusterSetting("test", "test", "test", "test", "test"); - return Optional.of(setting); - // if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) { - // return Optional.of( - // AccessController.doPrivileged( - // (PrivilegedAction) - // () -> - // SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig( - // sparkExecutionEngineConfigSettingString))); - // } else { - // return Optional.empty(); - // } + String sparkExecutionEngineConfigSettingString = + this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG); + if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) { + return Optional.of( + AccessController.doPrivileged( + (PrivilegedAction) + () -> + SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig( + sparkExecutionEngineConfigSettingString))); + } else { + return Optional.empty(); + } } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 3524d08b69..4a5c7b1691 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -35,7 +35,7 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob; import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest; @@ -197,6 +197,6 @@ private void assertIndexExists() { /** Returns the job runner instance for the scheduler. */ public static ScheduledJobRunner getJobRunner() { - return AsyncQueryScheduledQueryJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJob.getJobRunnerInstance(); } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java similarity index 91% rename from async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJob.java rename to async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java index 14058d603e..ad5dde44ba 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJob.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java @@ -21,7 +21,7 @@ import org.opensearch.threadpool.ThreadPool; /** - * The job runner class for scheduling refresh index query. + * The job runner class for scheduling async query. * *

The job runner should be a singleton class if it uses OpenSearch client or other objects * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, @@ -33,12 +33,12 @@ * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler * plugin. */ -public class AsyncQueryScheduledQueryJob implements ScheduledJobRunner { - private static final Logger LOGGER = LogManager.getLogger(AsyncQueryScheduledQueryJob.class); +public class ScheduledAsyncQueryJob implements ScheduledJobRunner { + private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJob.class); - public static AsyncQueryScheduledQueryJob INSTANCE = new AsyncQueryScheduledQueryJob(); + public static ScheduledAsyncQueryJob INSTANCE = new ScheduledAsyncQueryJob(); - public static AsyncQueryScheduledQueryJob getJobRunnerInstance() { + public static ScheduledAsyncQueryJob getJobRunnerInstance() { return INSTANCE; } @@ -47,7 +47,7 @@ public static AsyncQueryScheduledQueryJob getJobRunnerInstance() { private Client client; private AsyncQueryExecutorService asyncQueryExecutorService; - private AsyncQueryScheduledQueryJob() { + private ScheduledAsyncQueryJob() { // Singleton class, use getJobRunnerInstance method instead of constructor } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index f27db4c577..52ffda483c 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -120,7 +120,6 @@ public QueryHandlerFactory queryhandlerFactory( FlintIndexOpFactory flintIndexOpFactory, EMRServerlessClientFactory emrServerlessClientFactory, MetricsService metricsService, - AsyncQueryScheduler asyncQueryScheduler, SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider) { return new QueryHandlerFactory( openSearchJobExecutionResponseReader, @@ -131,7 +130,6 @@ public QueryHandlerFactory queryhandlerFactory( flintIndexOpFactory, emrServerlessClientFactory, metricsService, - asyncQueryScheduler, sparkSubmitParametersBuilderProvider); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 172f7c520d..ece7e3bdb7 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -100,6 +100,7 @@ import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.test.OpenSearchIntegTestCase; @@ -125,6 +126,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected StateStore stateStore; protected SessionStorageService sessionStorageService; protected StatementStorageService statementStorageService; + protected AsyncQueryScheduler asyncQueryScheduler; protected AsyncQueryRequestContext asyncQueryRequestContext; protected SessionIdProvider sessionIdProvider = new DatasourceEmbeddedSessionIdProvider(); @@ -205,6 +207,7 @@ public void setup() { new OpenSearchSessionStorageService(stateStore, new SessionModelXContentSerializer()); statementStorageService = new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); + asyncQueryScheduler = new OpenSearchAsyncQueryScheduler(client, clusterService); } protected FlintIndexOpFactory getFlintIndexOpFactory( @@ -213,7 +216,8 @@ protected FlintIndexOpFactory getFlintIndexOpFactory( flintIndexStateModelService, flintIndexClient, flintIndexMetadataService, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); } @After @@ -299,7 +303,8 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( flintIndexStateModelService, flintIndexClient, new FlintIndexMetadataServiceImpl(client), - emrServerlessClientFactory), + emrServerlessClientFactory, + asyncQueryScheduler), emrServerlessClientFactory, new OpenSearchMetricsService(), new OpenSearchAsyncQueryScheduler(client, clusterService), diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java similarity index 91% rename from async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJobTest.java rename to async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java index f83cbb6358..156fbbf2e5 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/AsyncQueryScheduledQueryJobTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobTest.java @@ -32,7 +32,7 @@ import org.opensearch.sql.spark.scheduler.model.OpenSearchScheduleQueryJobRequest; import org.opensearch.threadpool.ThreadPool; -public class AsyncQueryScheduledQueryJobTest { +public class ScheduledAsyncQueryJobTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterService clusterService; @@ -48,14 +48,14 @@ public class AsyncQueryScheduledQueryJobTest { @Mock private JobExecutionContext context; - private AsyncQueryScheduledQueryJob jobRunner; + private ScheduledAsyncQueryJob jobRunner; - private AsyncQueryScheduledQueryJob spyJobRunner; + private ScheduledAsyncQueryJob spyJobRunner; @BeforeEach public void setup() { MockitoAnnotations.openMocks(this); - jobRunner = AsyncQueryScheduledQueryJob.getJobRunnerInstance(); + jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance(); jobRunner.setClient(null); jobRunner.setClusterService(null); jobRunner.setThreadPool(null); @@ -98,7 +98,7 @@ public void testRunJobWithCorrectParameter() { @Test public void testRunJobWithIncorrectParameter() { - jobRunner = AsyncQueryScheduledQueryJob.getJobRunnerInstance(); + jobRunner = ScheduledAsyncQueryJob.getJobRunnerInstance(); jobRunner.setClusterService(clusterService); jobRunner.setThreadPool(threadPool); jobRunner.setClient(client); @@ -163,9 +163,9 @@ public void testRunJobWithUninitializedServices() { @Test public void testGetJobRunnerInstanceMultipleCalls() { - AsyncQueryScheduledQueryJob instance1 = AsyncQueryScheduledQueryJob.getJobRunnerInstance(); - AsyncQueryScheduledQueryJob instance2 = AsyncQueryScheduledQueryJob.getJobRunnerInstance(); - AsyncQueryScheduledQueryJob instance3 = AsyncQueryScheduledQueryJob.getJobRunnerInstance(); + ScheduledAsyncQueryJob instance1 = ScheduledAsyncQueryJob.getJobRunnerInstance(); + ScheduledAsyncQueryJob instance2 = ScheduledAsyncQueryJob.getJobRunnerInstance(); + ScheduledAsyncQueryJob instance3 = ScheduledAsyncQueryJob.getJobRunnerInstance(); assertSame(instance1, instance2); assertSame(instance2, instance3); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 5cc2d2481b..487bf33eaf 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -97,7 +97,7 @@ import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; import org.opensearch.sql.spark.scheduler.OpenSearchScheduleQueryJobRequestParser; -import org.opensearch.sql.spark.scheduler.job.AsyncQueryScheduledQueryJob; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJob; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -247,7 +247,7 @@ public Collection createComponents( injector.getInstance(FlintIndexOpFactory.class)); AsyncQueryExecutorService asyncQueryExecutorService = injector.getInstance(AsyncQueryExecutorService.class); - AsyncQueryScheduledQueryJob.getJobRunnerInstance() + ScheduledAsyncQueryJob.getJobRunnerInstance() .loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); return ImmutableList.of( @@ -266,7 +266,7 @@ public String getJobIndex() { @Override public ScheduledJobRunner getJobRunner() { - return AsyncQueryScheduledQueryJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJob.getJobRunnerInstance(); } @Override