Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Execution Engine Config Refactor #2249

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
Expand Down Expand Up @@ -216,15 +218,21 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
if (StringUtils.isEmpty(this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG))) {
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier =
new SparkExecutionEngineConfigSupplierImpl(pluginSettings);
SparkExecutionEngineConfig sparkExecutionEngineConfig =
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
if (StringUtils.isEmpty(sparkExecutionEngineConfig.getRegion())) {
LOGGER.warn(
String.format(
"Async Query APIs are disabled as %s is not configured in cluster settings. "
"Async Query APIs are disabled as %s is not configured properly in cluster settings. "
+ "Please configure and restart the domain to enable Async Query APIs",
SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue()));
this.asyncQueryExecutorService = new AsyncQueryExecutorServiceImpl();
} else {
this.asyncQueryExecutorService = createAsyncQueryExecutorService();
this.asyncQueryExecutorService =
createAsyncQueryExecutorService(
sparkExecutionEngineConfigSupplier, sparkExecutionEngineConfig);
}

ModulesBuilder modules = new ModulesBuilder();
Expand Down Expand Up @@ -295,10 +303,13 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceUserAuthorizationHelper);
}

private AsyncQueryExecutorService createAsyncQueryExecutorService() {
private AsyncQueryExecutorService createAsyncQueryExecutorService(
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier,
SparkExecutionEngineConfig sparkExecutionEngineConfig) {
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService =
new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService);
EMRServerlessClient emrServerlessClient = createEMRServerlessClient();
EMRServerlessClient emrServerlessClient =
createEMRServerlessClient(sparkExecutionEngineConfig.getRegion());
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
Expand All @@ -308,21 +319,18 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() {
jobExecutionResponseReader,
new FlintIndexMetadataReaderImpl(client));
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
asyncQueryJobMetadataStorageService,
sparkQueryDispatcher,
sparkExecutionEngineConfigSupplier);
}

private EMRServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
private EMRServerlessClient createEMRServerlessClient(String region) {
return AccessController.doPrivileged(
(PrivilegedAction<EMRServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withRegion(region)
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,22 @@

package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME;
import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.AllArgsConstructor;
import org.json.JSONObject;
import org.opensearch.cluster.ClusterName;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand All @@ -37,7 +33,7 @@
public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService {
private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService;
private SparkQueryDispatcher sparkQueryDispatcher;
private Settings settings;
private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier;
private Boolean isSparkJobExecutionEnabled;

public AsyncQueryExecutorServiceImpl() {
Expand All @@ -47,26 +43,19 @@ public AsyncQueryExecutorServiceImpl() {
public AsyncQueryExecutorServiceImpl(
AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService,
SparkQueryDispatcher sparkQueryDispatcher,
Settings settings) {
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) {
this.isSparkJobExecutionEnabled = Boolean.TRUE;
this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService;
this.sparkQueryDispatcher = sparkQueryDispatcher;
this.settings = settings;
this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier;
}

@Override
public CreateAsyncQueryResponse createAsyncQuery(
CreateAsyncQueryRequest createAsyncQueryRequest) {
validateSparkExecutionEngineSettings();
String sparkExecutionEngineConfigString =
settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfig sparkExecutionEngineConfig =
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfig>)
() ->
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString));
ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME);
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig();
DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(
new DispatchQueryRequest(
Expand All @@ -75,7 +64,7 @@ public CreateAsyncQueryResponse createAsyncQuery(
createAsyncQueryRequest.getDatasource(),
createAsyncQueryRequest.getLang(),
sparkExecutionEngineConfig.getExecutionRoleARN(),
clusterName.value(),
sparkExecutionEngineConfig.getClusterName(),
sparkExecutionEngineConfig.getSparkSubmitParameters()));
asyncQueryJobMetadataStorageService.storeJobMetadata(
new AsyncQueryJobMetadata(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* POJO for spark Execution Engine Config. Interface between {@link
* org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService} and {@link
* SparkExecutionEngineConfigSupplier}
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@NoArgsConstructor
@AllArgsConstructor
public class SparkExecutionEngineConfig {
private String applicationId;
private String region;
private String executionRoleARN;

/** Additional Spark submit parameters to append to request. */
private String sparkSubmitParameters;

public static SparkExecutionEngineConfig toSparkExecutionEngineConfig(String jsonString) {
return new Gson().fromJson(jsonString, SparkExecutionEngineConfig.class);
}
private String clusterName;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.config;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.google.gson.Gson;
import lombok.Data;

/**
* This POJO is just for reading stringified json in `plugins.query.executionengine.spark.config`
* setting.
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class SparkExecutionEngineConfigClusterSetting {
private String applicationId;
private String region;
private String executionRoleARN;

/** Additional Spark submit parameters to append to request. */
private String sparkSubmitParameters;

public static SparkExecutionEngineConfigClusterSetting toSparkExecutionEngineConfig(
String jsonString) {
return new Gson().fromJson(jsonString, SparkExecutionEngineConfigClusterSetting.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.opensearch.sql.spark.config;

/** Interface for extracting and providing SparkExecutionEngineConfig */
public interface SparkExecutionEngineConfigSupplier {

/**
* Get SparkExecutionEngineConfig
*
* @return {@link SparkExecutionEngineConfig}.
*/
SparkExecutionEngineConfig getSparkExecutionEngineConfig();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.opensearch.sql.spark.config;

import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME;
import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG;

import java.security.AccessController;
import java.security.PrivilegedAction;
import lombok.AllArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.cluster.ClusterName;
import org.opensearch.sql.common.setting.Settings;

@AllArgsConstructor
public class SparkExecutionEngineConfigSupplierImpl implements SparkExecutionEngineConfigSupplier {

private Settings settings;

@Override
public SparkExecutionEngineConfig getSparkExecutionEngineConfig() {
String sparkExecutionEngineConfigSettingString =
this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfig sparkExecutionEngineConfig = new SparkExecutionEngineConfig();
if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
SparkExecutionEngineConfigClusterSetting sparkExecutionEngineConfigClusterSetting =
penghuo marked this conversation as resolved.
Show resolved Hide resolved
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
() ->
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigSettingString));
sparkExecutionEngineConfig.setApplicationId(
sparkExecutionEngineConfigClusterSetting.getApplicationId());
sparkExecutionEngineConfig.setExecutionRoleARN(
sparkExecutionEngineConfigClusterSetting.getExecutionRoleARN());
sparkExecutionEngineConfig.setSparkSubmitParameters(
sparkExecutionEngineConfigClusterSetting.getSparkSubmitParameters());
sparkExecutionEngineConfig.setRegion(sparkExecutionEngineConfigClusterSetting.getRegion());
}
ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME);
sparkExecutionEngineConfig.setClusterName(clusterName.value());
return sparkExecutionEngineConfig;
}
}
Loading