Skip to content

Commit

Permalink
Add feature flag for async query scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Sep 4, 2024
1 parent f9fe064 commit 853f7fe
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ public class SparkConstants {
"com.amazonaws.emr.AssumeRoleAWSCredentialsProvider";
public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/";
public static final String FLINT_JOB_QUERY = "spark.flint.job.query";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED =
"spark.flint.job.externalScheduler.enabled";
public static final String FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL =
"spark.flint.job.externalScheduler.interval";
public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex";
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.parameter.GeneralSparkParameterComposer;
import org.opensearch.sql.spark.parameter.SparkSubmitParameters;

@RequiredArgsConstructor
public class OpenSearchAsyncQuerySchedulerConfigComposer implements GeneralSparkParameterComposer {
private final Settings settings;

@Override
public void compose(
SparkSubmitParameters sparkSubmitParameters,
DispatchQueryRequest dispatchQueryRequest,
AsyncQueryRequestContext context) {
String externalSchedulerEnabled =
settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED);
String externalSchedulerInterval =
settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL);
sparkSubmitParameters.setConfigItem(
FLINT_JOB_EXTERNAL_SCHEDULER_ENABLED, externalSchedulerEnabled);
sparkSubmitParameters.setConfigItem(
FLINT_JOB_EXTERNAL_SCHEDULER_INTERVAL, externalSchedulerInterval);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opensearch.sql.spark.asyncquery.OpenSearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.client.EMRServerlessClientFactoryImpl;
import org.opensearch.sql.spark.config.OpenSearchAsyncQuerySchedulerConfigComposer;
import org.opensearch.sql.spark.config.OpenSearchExtraParameterComposer;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
Expand Down Expand Up @@ -168,6 +169,7 @@ public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider
collection.register(
DataSourceType.SECURITY_LAKE,
new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchAsyncQuerySchedulerConfigComposer(settings));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.opensearch.sql.spark.config;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.parameter.SparkSubmitParameters;

@ExtendWith(MockitoExtension.class)
public class OpenSearchAsyncQuerySchedulerConfigComposerTest {

@Mock private Settings settings;
@Mock private SparkSubmitParameters sparkSubmitParameters;
@Mock private DispatchQueryRequest dispatchQueryRequest;
@Mock private AsyncQueryRequestContext context;

private OpenSearchAsyncQuerySchedulerConfigComposer composer;

@BeforeEach
public void setUp() {
composer = new OpenSearchAsyncQuerySchedulerConfigComposer(settings);
}

@Test
public void testCompose() {
when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED))
.thenReturn("true");
when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL))
.thenReturn("10 minutes");

composer.compose(sparkSubmitParameters, dispatchQueryRequest, context);

verify(sparkSubmitParameters)
.setConfigItem("spark.flint.job.externalScheduler.enabled", "true");
verify(sparkSubmitParameters)
.setConfigItem("spark.flint.job.externalScheduler.interval", "10 minutes");
}

@Test
public void testComposeWithDisabledScheduler() {
when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED))
.thenReturn("false");

composer.compose(sparkSubmitParameters, dispatchQueryRequest, context);

verify(sparkSubmitParameters)
.setConfigItem("spark.flint.job.externalScheduler.enabled", "false");
}

@Test
public void testComposeWithMissingInterval() {
when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED))
.thenReturn("true");
when(settings.getSettingValue(Settings.Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL))
.thenReturn("");

composer.compose(sparkSubmitParameters, dispatchQueryRequest, context);

verify(sparkSubmitParameters).setConfigItem("spark.flint.job.externalScheduler.interval", "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public enum Key {

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED(
"plugins.query.executionengine.async_query.external_scheduler.enabled"),
ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL(
"plugins.query.executionengine.async_query.external_scheduler.interval"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

Expand Down
70 changes: 70 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,76 @@ Request::
}
}

plugins.query.executionengine.async_query.external_scheduler.enabled
=====================================================================

Description
-----------
This setting controls whether the external scheduler is enabled for async queries.

* Default Value: true
* Scope: Node-level
* Dynamic Update: Yes, this setting can be updated dynamically.

To disable the external scheduler, use the following command:

Request ::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.enabled":"false"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"async_query": {
"external_scheduler": {
"enabled": "false"
}
}
}
}
}
}
}

plugins.query.executionengine.async_query.external_scheduler.interval
=====================================================================

Description
-----------
This setting specifies the interval at which the external scheduler runs to check for pending async queries.

* Default Value: None (must be explicitly set)
* Format: A string representing a time duration follows Spark [CalendarInterval](https://spark.apache.org/docs/latest/api/java/org/apache/spark/unsafe/types/CalendarInterval.html)
format (e.g., "10m" for 10 minutes, "1h" for 1 hour)

To modify the interval to 10 minutes for example, use this command:

Request ::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.async_query.external_scheduler.interval":"10 minutes"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"async_query": {
"external_scheduler": {
"interval": "10 minutes"
}
}
}
}
}
}
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
======================================================================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING =
Setting.boolSetting(
Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING =
Setting.simpleString(
Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Expand Down Expand Up @@ -298,6 +311,18 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.ASYNC_QUERY_ENABLED,
ASYNC_QUERY_ENABLED_SETTING,
new Updater(Key.ASYNC_QUERY_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED,
ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING,
new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED));
register(
settingBuilder,
clusterSettings,
Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL,
ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING,
new Updater(Key.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL));
register(
settingBuilder,
clusterSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import static org.mockito.Mockito.when;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.sql.opensearch.setting.LegacyOpenDistroSettings.legacySettings;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_INTERVAL_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.METRICS_ROLLING_WINDOW_SETTING;
import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.PPL_ENABLED_SETTING;
Expand Down Expand Up @@ -195,4 +197,22 @@ void getSparkExecutionEngineConfigSetting() {
.put(SPARK_EXECUTION_ENGINE_CONFIG.getKey(), sparkConfig)
.build()));
}

@Test
void getAsyncQueryExternalSchedulerEnabledSetting() {
// Default is true
assertEquals(
true,
ASYNC_QUERY_EXTERNAL_SCHEDULER_ENABLED_SETTING.get(
org.opensearch.common.settings.Settings.builder().build()));
}

@Test
void getAsyncQueryExternalSchedulerIntervalSetting() {
// Default is empty string
assertEquals(
"",
ASYNC_QUERY_EXTERNAL_SCHEDULER_INTERVAL_SETTING.get(
org.opensearch.common.settings.Settings.builder().build()));
}
}

0 comments on commit 853f7fe

Please sign in to comment.