Skip to content

Commit

Permalink
Add flag for iceberg and correct flag for Lake Formation.
Browse files Browse the repository at this point in the history
Previously, Iceberg catalog was set as the default catalog. This poses
problems as the behavior to fall back to default Spark catalog is only
correct in some versions of Iceberg. Rather than always opt into
Iceberg, Iceberg should be an option.

Additionally, the Lake Formation flag enabled Lake Formation for the EMR
job. This did not work as expected because EMR system space does not
work with Flint. Instead Lake Formation can be enabled using the Iceberg
catalog implementation.

Signed-off-by: Adi Suresh <[email protected]>
  • Loading branch information
asuresh8 committed Jul 30, 2024
1 parent ba82e12 commit 726c24f
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,31 @@ public class SparkConstants {
public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL";

public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog";
public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl";
public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region";
public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN =
SPARK_CATALOG + ".client.assume-role.arn";
public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION =
SPARK_CATALOG + ".client.assume-role.region";
public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY =
SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller";
public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id";
public static final String SPARK_CATALOG_GLUE_LF_ENABLED =
SPARK_CATALOG + ".glue.lakeformation-enabled";

public static final String ICEBERG_SESSION_CATALOG =
"org.apache.iceberg.spark.SparkSessionCatalog";
public static final String ICEBERG_SPARK_EXTENSION =
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
public static final String ICEBERG_SPARK_RUNTIME_PACKAGE =
"/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar";
public static final String SPARK_CATALOG_CATALOG_IMPL =
"spark.sql.catalog.spark_catalog.catalog-impl";
public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog";
public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY =
"org.apache.iceberg.aws.AssumeRoleAwsClientFactory";
public static final String ICEBERG_LF_CLIENT_FACTORY =
"org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory";

public static final String EMR_LAKEFORMATION_OPTION =
"spark.emr-serverless.lakeformation.enabled";
public static final String FLINT_ACCELERATE_USING_COVERING_INDEX =
"spark.flint.optimizer.covering.enabled";
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,13 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE;
Expand Down Expand Up @@ -71,7 +64,6 @@ private void setDefaultConfigs() {
setConfigItem(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE);
setConfigItem(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
Expand All @@ -85,12 +77,8 @@ private void setDefaultConfigs() {
setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
}

private void setConfigItem(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,39 +203,6 @@ void testDispatchSelectQuery() {
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithLakeFormation() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
HashMap<String, String> tags = new HashMap<>();
tags.put(DATASOURCE_TAG_KEY, MY_GLUE);
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME);
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText());
String query = "select * from my_glue.default.http_logs";
String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query);
StartJobRequest expected =
new StartJobRequest(
"TEST_CLUSTER:batch",
null,
EMRS_APPLICATION_ID,
EMRS_EXECUTION_ROLE,
sparkSubmitParameters,
tags,
false,
"query_execution_result_my_glue");
when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID);
DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation();
when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata(
MY_GLUE, asyncQueryRequestContext))
.thenReturn(dataSourceMetadata);

DispatchQueryResponse dispatchQueryResponse =
sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext);
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue());
Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
verifyNoInteractions(flintIndexMetadataService);
}

@Test
void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
Expand Down Expand Up @@ -1080,7 +1047,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String
+ getConfParam(
"spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory",
"spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar",
"spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT",
"spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots",
"spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/",
Expand All @@ -1092,10 +1058,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String
"spark.datasource.flint.scheme=SCHEMA",
"spark.datasource.flint.auth=basic",
"spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider",
"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog")
"spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions",
"spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
+ getConfParam("spark.flint.job.query=" + query)
+ (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "")
+ getConfParam(
Expand Down Expand Up @@ -1144,25 +1108,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() {
.build();
}

private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() {

Map<String, String> properties = new HashMap<>();
properties.put("glue.auth.type", "iam_role");
properties.put(
"glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole");
properties.put(
"glue.indexstore.opensearch.uri",
"https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com");
properties.put("glue.indexstore.opensearch.auth", "awssigv4");
properties.put("glue.indexstore.opensearch.region", "eu-west-1");
properties.put("glue.lakeformation.enabled", "true");
return new DataSourceMetadata.Builder()
.setName(MY_GLUE)
.setConnector(DataSourceType.S3GLUE)
.setProperties(properties)
.build();
}

private DataSourceMetadata constructPrometheusDataSourceType() {
return new DataSourceMetadata.Builder()
.setName("my_prometheus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@

package org.opensearch.sql.spark.parameter;

import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG;
import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY;
Expand All @@ -25,39 +26,104 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY;

import com.amazonaws.arn.Arn;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.auth.AuthenticationType;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;

@RequiredArgsConstructor
public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer {
public static final String FLINT_BASIC_AUTH = "basic";
public static final String FALSE = "false";
public static final String TRUE = "true";

private final SparkExecutionEngineConfigClusterSettingLoader settingLoader;

@Override
public void compose(
DataSourceMetadata metadata,
SparkSubmitParameters params,
DispatchQueryRequest dispatchQueryRequest,
AsyncQueryRequestContext context) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final Optional<SparkExecutionEngineConfigClusterSetting> maybeClusterSettings =
settingLoader.load();
if (!maybeClusterSettings.isPresent()) {
throw new RuntimeException("No cluster settings present");
}
final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get();
final String region = clusterSetting.getRegion();

final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
final String accountId = Arn.fromString(roleArn).getAccountId();

params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
params.setConfigItem(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName());

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled));
params.setConfigItem(
FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled));
final boolean icebergEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED));
if (icebergEnabled) {
params.setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE);
params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG);
params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG);
params.setConfigItem(
SPARK_SQL_EXTENSIONS_KEY,
ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);

params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region);
params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn);
params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region);

final boolean lakeFormationEnabled =
BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED));
if (lakeFormationEnabled) {
final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG);
if (StringUtils.isBlank(sessionTag)) {
throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required");
}

params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE);
params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE);
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY);
params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag);
} else {
params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY);
}
}

setFlintIndexStoreHost(
params,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ public FlintIndexStateModelService flintIndexStateModelService(
public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider(
Settings settings, SparkExecutionEngineConfigClusterSettingLoader clusterSettingLoader) {
SparkParameterComposerCollection collection = new SparkParameterComposerCollection();
collection.register(DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer());
collection.register(
DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader));
collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader));
return new SparkSubmitParametersBuilderProvider(collection);
}
Expand Down
Loading

0 comments on commit 726c24f

Please sign in to comment.