diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 9b82022d8f..43815a9904 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -87,6 +87,10 @@ public class SparkConstants { public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/"; public static final String FLINT_JOB_QUERY = "spark.flint.job.query"; public static final String FLINT_JOB_QUERY_ID = "spark.flint.job.queryId"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED = + "spark.flint.job.externalScheduler.enabled"; + public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL = + "spark.flint.job.externalScheduler.interval"; public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex"; public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId"; diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index e1c9bb6f39..ca4a8736d2 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -227,7 +227,7 @@ public void createDropIndexQueryWithScheduler() { assertNull(response.getSessionId()); verifyGetQueryIdCalled(); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); verify(asyncQueryScheduler).unscheduleJob(indexName); } @@ -275,7 +275,7 @@ public void createVacuumIndexQueryWithScheduler() { verify(flintIndexClient).deleteIndex(indexName); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); verify(asyncQueryScheduler).removeJob(indexName); } @@ -342,7 +342,7 @@ public void createAlterIndexQueryWithScheduler() { verify(asyncQueryScheduler).unscheduleJob(indexName); verifyCreateIndexDMLResultCalled(); - verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } @Test diff --git a/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java new file mode 100644 index 0000000000..6dce09a406 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposer.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.config; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer; +import org.opensearch.sql.spark.parameter.SparkSubmitParameters; + +@RequiredArgsConstructor +public class OpenSearchAsyncQuerySchedulerConfigComposer implements GeneralSparkParameterComposer { + private final Settings settings; + + @Override + public void compose( + SparkSubmitParameters sparkSubmitParameters, + DispatchQueryRequest dispatchQueryRequest, + AsyncQueryRequestContext context) { + String externalSchedulerEnabled = + settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED); + String externalSchedulerInterval = + settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL); + sparkSubmitParameters.setConfigItem( + FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled); + sparkSubmitParameters.setConfigItem( + FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java index 2d5a1b332f..47e652c570 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java @@ -15,7 +15,6 @@ /** Parse string raw schedule into job scheduler IntervalSchedule */ public class IntervalScheduleParser { - private static final Pattern DURATION_PATTERN = Pattern.compile( "^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|microseconds?|microsecond|micros?|micros|nanoseconds?|nanos?)$", diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 52ffda483c..c6f6ffcd81 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -24,6 +24,7 @@ import org.opensearch.sql.spark.asyncquery.OpenSearchAsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.client.EMRServerlessClientFactoryImpl; +import org.opensearch.sql.spark.config.OpenSearchAsyncQuerySchedulerConfigComposer; import org.opensearch.sql.spark.config.OpenSearchExtraParameterComposer; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; @@ -168,6 +169,7 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider collection.register( DataSourceType.SECURITY_LAKE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); + collection.register(new OpenSearchAsyncQuerySchedulerConfigComposer(settings)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java new file mode 100644 index 0000000000..7836c63b7a --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/config/OpenSearchAsyncQuerySchedulerConfigComposerTest.java @@ -0,0 +1,68 @@ +package org.opensearch.sql.spark.config; + +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +import org.opensearch.sql.spark.parameter.SparkSubmitParameters; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchAsyncQuerySchedulerConfigComposerTest { + + @Mock private Settings settings; + @Mock private SparkSubmitParameters sparkSubmitParameters; + @Mock private DispatchQueryRequest dispatchQueryRequest; + @Mock private AsyncQueryRequestContext context; + + private OpenSearchAsyncQuerySchedulerConfigComposer composer; + + @BeforeEach + public void setUp() { + composer = new OpenSearchAsyncQuerySchedulerConfigComposer(settings); + } + + @Test + public void testCompose() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("true"); + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) + .thenReturn("10 minutes"); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.enabled", "true"); + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.interval", "10 minutes"); + } + + @Test + public void testComposeWithDisabledScheduler() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("false"); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters) + .setConfigItem("spark.flint.job.externalScheduler.enabled", "false"); + } + + @Test + public void testComposeWithMissingInterval() { + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)) + .thenReturn("true"); + when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)) + .thenReturn(""); + + composer.compose(sparkSubmitParameters, dispatchQueryRequest, context); + + verify(sparkSubmitParameters).setConfigItem("spark.flint.job.externalScheduler.interval", ""); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java index b119c345b9..f211548c7c 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.scheduler.parser; +import static org.junit.Assert.assertNotNull; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -25,6 +26,13 @@ public void setup() { startTime = Instant.now(); } + @Test + public void testConstructor() { + // Test that the constructor of IntervalScheduleParser can be invoked + IntervalScheduleParser parser = new IntervalScheduleParser(); + assertNotNull(parser); + } + @Test public void testParseValidScheduleString() { verifyParseSchedule(5, "5 minutes"); diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index b6643f3209..0037032d22 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -51,6 +51,10 @@ public enum Key { /** Async query Settings * */ ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED( + "plugins.query.executionengine.async_query.external_scheduler.enabled"), + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL( + "plugins.query.executionengine.async_query.external_scheduler.interval"), STREAMING_JOB_HOUSEKEEPER_INTERVAL( "plugins.query.executionengine.spark.streamingjobs.housekeeper.interval"); diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 236406e2c7..71718d1726 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -639,6 +639,75 @@ Request:: } } +plugins.query.executionengine.async_query.external_scheduler.enabled +===================================================================== + +Description +----------- +This setting controls whether the external scheduler is enabled for async queries. + +* Default Value: true +* Scope: Node-level +* Dynamic Update: Yes, this setting can be updated dynamically. + +To disable the external scheduler, use the following command: + +Request :: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.enabled":"false"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "async_query": { + "external_scheduler": { + "enabled": "false" + } + } + } + } + } + } + } + +plugins.query.executionengine.async_query.external_scheduler.interval +===================================================================== + +Description +----------- +This setting defines the interval at which the external scheduler applies for auto refresh queries. It optimizes Spark applications by allowing them to automatically decide whether to use the Spark scheduler or the external scheduler. + +* Default Value: None (must be explicitly set) +* Format: A string representing a time duration follows Spark `CalendarInterval `__ format (e.g., ``10 minutes`` for 10 minutes, ``1 hour`` for 1 hour). + +To modify the interval to 10 minutes for example, use this command: + +Request :: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \ + ... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.interval":"10 minutes"}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "async_query": { + "external_scheduler": { + "interval": "10 minutes" + } + } + } + } + } + } + } + plugins.query.executionengine.spark.streamingjobs.housekeeper.interval ====================================================================== diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 494b906b55..1083dbd836 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -154,6 +154,19 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING = + Setting.boolSetting( + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING = + Setting.simpleString( + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL.getKeyValue(), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + public static final Setting SPARK_EXECUTION_ENGINE_CONFIG = Setting.simpleString( Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(), @@ -298,6 +311,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.ASYNC_QUERY_ENABLED, ASYNC_QUERY_ENABLED_SETTING, new Updater(Key.ASYNC_QUERY_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED, + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING, + new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL, + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING, + new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL)); register( settingBuilder, clusterSettings, @@ -419,6 +444,8 @@ public static List> pluginSettings() { .add(DATASOURCE_URI_HOSTS_DENY_LIST) .add(DATASOURCE_ENABLED_SETTING) .add(ASYNC_QUERY_ENABLED_SETTING) + .add(ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING) + .add(ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) .add(SPARK_EXECUTION_REFRESH_JOB_LIMIT_SETTING) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java index 84fb705ae0..026f0c6218 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/setting/OpenSearchSettingsTest.java @@ -15,6 +15,8 @@ import static org.mockito.Mockito.when; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; import static org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings.legacySettings; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_INTERVAL_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_WINDOW_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.PPL_ENABLED_SETTING; @@ -195,4 +197,22 @@ void getSparkExecutionEngineConfigSetting() { .put(SPARK_EXECUTION_ENGINE_CONFIG.getKey(), sparkConfig) .build())); } + + @Test + void getAsyncQueryExternalSchedulerEnabledSetting() { + // Default is true + assertEquals( + true, + ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING.get( + org.opensearch.common.settings.Settings.builder().build())); + } + + @Test + void getAsyncQueryExternalSchedulerIntervalSetting() { + // Default is empty string + assertEquals( + "", + ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING.get( + org.opensearch.common.settings.Settings.builder().build())); + } }