Skip to content

Commit

Permalink
Refactor query param
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 11, 2024
1 parent 1a09f96 commit ca3d66e
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;

/** Define Spark Submit Parameters. */
/** Defines the parameters required for Spark submit command construction. */
@AllArgsConstructor
@RequiredArgsConstructor
public class SparkSubmitParameters {
public static final String SPACE = " ";
public static final String EQUALS = "=";
public static final String FLINT_BASIC_AUTH = "basic";
private static final String SPACE = " ";
private static final String EQUALS = "=";
private static final String FLINT_BASIC_AUTH = "basic";

private final String className;
private final Map<String, String> config;
Expand All @@ -40,34 +40,12 @@ public class SparkSubmitParameters {
private String extraParameters;

public static class Builder {

private String className;
private final Map<String, String> config;
private final Map<String, String> config = new LinkedHashMap<>();
private String extraParameters;

private Builder() {
className = DEFAULT_CLASS_NAME;
config = new LinkedHashMap<>();

config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE);
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
initializeDefaultConfigurations();
}

public static Builder builder() {
Expand All @@ -86,35 +64,101 @@ public Builder clusterName(String clusterName) {
}

public Builder dataSource(DataSourceMetadata metadata) {
if (DataSourceType.S3GLUE.equals(metadata.getConnector())) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);

config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

setFlintIndexStoreHost(
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName()));
setFlintIndexStoreAuthProperties(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
return this;
if (!DataSourceType.S3GLUE.equals(metadata.getConnector())) {
throw new UnsupportedOperationException(
String.format(
"Unsupported datasource type for async queries: %s", metadata.getConnector()));
}
throw new UnsupportedOperationException(
String.format(
"UnSupported datasource type for async queries:: %s", metadata.getConnector()));

configureDataSource(metadata);
return this;
}

public Builder extraParameters(String params) {
this.extraParameters = params;
return this;
}

private void setFlintIndexStoreHost(URI uri) {
public Builder query(String query) {
config.put(FLINT_JOB_QUERY, query);
return this;
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}

public Builder structuredStreaming(Boolean isStructuredStreaming) {
if (Boolean.TRUE.equals(isStructuredStreaming)) {
config.put("spark.flint.job.type", "streaming");
}
return this;
}

public SparkSubmitParameters build() {
return new SparkSubmitParameters(className, config, extraParameters);
}

private void configureDataSource(DataSourceMetadata metadata) {
// DataSource specific configuration
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);

config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

URI uri =
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName());
config.put(FLINT_INDEX_STORE_HOST_KEY, uri.getHost());
config.put(FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort()));
config.put(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme());

setFlintIndexStoreAuthProperties(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
}

private void initializeDefaultConfigurations() {
className = DEFAULT_CLASS_NAME;
// Default configurations initialization
config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE);
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
}

private URI parseUri(String opensearchUri, String datasourceName) {
try {
return new URI(opensearchUri);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
String.format(
"Bad URI in indexstore configuration for datasource: %s.", datasourceName),
e);
}
}

private void setFlintIndexStoreAuthProperties(
Expand All @@ -133,57 +177,20 @@ private void setFlintIndexStoreAuthProperties(
config.put(FLINT_INDEX_STORE_AUTH_KEY, authType);
}
}

private URI parseUri(String opensearchUri, String datasourceName) {
try {
return new URI(opensearchUri);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
String.format(
"Bad URI in indexstore configuration of the : %s datasoure.", datasourceName));
}
}

public Builder structuredStreaming(Boolean isStructuredStreaming) {
if (isStructuredStreaming) {
config.put("spark.flint.job.type", "streaming");
}
return this;
}

public Builder extraParameters(String params) {
extraParameters = params;
return this;
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}

public SparkSubmitParameters build() {
return new SparkSubmitParameters(className, config, extraParameters);
}
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(" --class ");
stringBuilder.append(this.className);
stringBuilder.append(SPACE);
for (String key : config.keySet()) {
stringBuilder.append(" --conf ");
stringBuilder.append(key);
stringBuilder.append(EQUALS);
stringBuilder.append(config.get(key));
stringBuilder.append(SPACE);
}

if (extraParameters != null) {
stringBuilder.append(extraParameters);
}
StringBuilder stringBuilder = new StringBuilder(" --class ").append(className).append(SPACE);
config.forEach(
(key, value) ->
stringBuilder
.append(" --conf ")
.append(key)
.append(EQUALS)
.append(value)
.append(SPACE));
if (extraParameters != null) stringBuilder.append(extraParameters);
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public String startJobRun(StartJobRequest startJobRequest) {
.withSparkSubmit(
new SparkSubmit()
.withEntryPoint(SPARK_SQL_APPLICATION_JAR)
.withEntryPointArguments(startJobRequest.getQuery(), resultIndex)
.withEntryPointArguments(resultIndex)
.withSparkSubmitParameters(startJobRequest.getSparkSubmitParams())));

StartJobRunResult startJobRunResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public class StartJobRequest {

public static final Long DEFAULT_JOB_TIMEOUT = 120L;

private final String query;
private final String jobName;
private final String applicationId;
private final String executionRoleArn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public class SparkConstants {
public static final String EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER =
"com.amazonaws.emr.AssumeRoleAWSCredentialsProvider";
public static final String JAVA_HOME_LOCATION = "/usr/lib/jvm/java-17-amazon-corretto.x86_64/";

public static final String FLINT_JOB_QUERY = "spark.flint.job.query";
public static final String FLINT_JOB_REQUEST_INDEX = "spark.flint.job.requestIndex";
public static final String FLINT_JOB_SESSION_ID = "spark.flint.job.sessionId";

public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL";
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ public DispatchQueryResponse submit(
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
clusterName + ":" + JobType.BATCH.getText(),
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.query(dispatchQueryRequest.getQuery())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ public DispatchQueryResponse submit(
+ indexQueryDetails.openSearchIndexName();
StartJobRequest startJobRequest =
new StartJobRequest(
dispatchQueryRequest.getQuery(),
jobName,
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.query(dispatchQueryRequest.getQuery())
.structuredStreaming(true)
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class CreateSessionRequest {

public StartJobRequest getStartJobRequest(String sessionId) {
return new InteractiveSessionStartJobRequest(
"select 1",
clusterName + ":" + JobType.INTERACTIVE.getText() + ":" + sessionId,
applicationId,
executionRoleArn,
Expand All @@ -34,22 +33,13 @@ public StartJobRequest getStartJobRequest(String sessionId) {

static class InteractiveSessionStartJobRequest extends StartJobRequest {
public InteractiveSessionStartJobRequest(
String query,
String jobName,
String applicationId,
String executionRoleArn,
String sparkSubmitParams,
Map<String, String> tags,
String resultIndex) {
super(
query,
jobName,
applicationId,
executionRoleArn,
sparkSubmitParams,
tags,
false,
resultIndex);
super(jobName, applicationId, executionRoleArn, sparkSubmitParams, tags, false, resultIndex);
}

/** Interactive query keep running. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,11 @@ public void testBuildWithExtraParameters() {
// Assert the conf is included with a space
assertTrue(params.endsWith(" --conf A=1"));
}

@Test
public void testBuildQueryString() {
String query = "SHOW tables LIKE \"%\";";
String params = SparkSubmitParameters.Builder.builder().query(query).build().toString();
assertTrue(params.contains(query));
}
}
Loading

0 comments on commit ca3d66e

Please sign in to comment.