Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsimanohar committed Sep 14, 2023
1 parent a7af359 commit b55d3c5
Show file tree
Hide file tree
Showing 33 changed files with 932 additions and 22 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void execute(
/** Data class that encapsulates ExprValue. */
@Data
class QueryResponse {
private String status = "COMPLETED";
private final Schema schema;
private final List<ExprValue> results;
private final Cursor cursor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,17 @@ public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
return dataSourceMetadataOptional.get();
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
return dataSourceMetadataOptional.get();
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
Expand Down
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
49 changes: 48 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@
package org.opensearch.sql.plugin;

import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,6 +89,15 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobManagementService;
import org.opensearch.sql.spark.jobs.JobManagementServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.SparkResponse;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
Expand Down Expand Up @@ -110,6 +125,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobManagementService jobManagementService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -202,6 +218,7 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
this.jobManagementService = createJobManagementService();

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
Expand All @@ -213,7 +230,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService);
return ImmutableList.of(dataSourceService, jobManagementService);
}

@Override
Expand Down Expand Up @@ -270,4 +287,34 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

private JobManagementService createJobManagementService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(emrServerlessClient, this.dataSourceService);
return new JobManagementServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(
awsemrServerless, new SparkResponse(client, null, STEP_ID_FIELD));
});
}
}
9 changes: 9 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";

// Permissions for aws emr servless sdk
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.executor.ExecutionEngine;
Expand All @@ -25,6 +26,8 @@
@RequiredArgsConstructor
public class QueryResult implements Iterable<Object[]> {

@Setter @Getter private String status;

@Getter private final ExecutionEngine.Schema schema;

/** Results which are collection of expression. */
Expand Down
5 changes: 4 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ repositories {

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.opensearch.sql.spark.client;

import org.json.JSONObject;
import org.opensearch.sql.spark.helper.FlintHelper;

public interface EmrServerlessClient {

String startJobRun(
String applicationId,
String query,
String datasourceRoleArn,
String executionRoleArn,
String datasourceName,
FlintHelper flintHelper);

JSONObject getJobResult(String applicationId, String jobId);
}
Loading

0 comments on commit b55d3c5

Please sign in to comment.