From 726c24f62595723ec2dc2699f000f088d5927cb1 Mon Sep 17 00:00:00 2001 From: Adi Suresh Date: Wed, 24 Jul 2024 19:55:12 +0000 Subject: [PATCH] Add flag for iceberg and correct flag for Lake Formation. Previously, Iceberg catalog was set as the default catalog. This poses problems as the behavior to fall back to default Spark catalog is only correct in some versions of Iceberg. Rather than always opt into Iceberg, Iceberg should be an option. Additionally, the Lake Formation flag enabled Lake Formation for the EMR job. This did not work as expected because EMR system space does not work with Flint. Instead Lake Formation can be enabled using the Iceberg catalog implementation. Signed-off-by: Adi Suresh --- .../spark/data/constants/SparkConstants.java | 21 +- .../SparkSubmitParametersBuilder.java | 14 +- .../dispatcher/SparkQueryDispatcherTest.java | 59 +---- ...3GlueDataSourceSparkParameterComposer.java | 80 ++++++- .../config/AsyncExecutorServiceModule.java | 3 +- .../AsyncQueryExecutorServiceSpec.java | 63 ++++- ...eDataSourceSparkParameterComposerTest.java | 225 +++++++++++++++++- .../glue/GlueDataSourceFactory.java | 17 ++ .../glue/GlueDataSourceFactoryTest.java | 63 +++++ .../ppl/admin/connectors/s3glue_connector.rst | 4 +- 10 files changed, 451 insertions(+), 98 deletions(-) diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 5b25bc175a..db6d667eee 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -92,18 +92,31 @@ public class SparkConstants { public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL"; public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog"; + public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl"; + public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region"; + public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN = + SPARK_CATALOG + ".client.assume-role.arn"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION = + SPARK_CATALOG + ".client.assume-role.region"; + public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY = + SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller"; + public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id"; + public static final String SPARK_CATALOG_GLUE_LF_ENABLED = + SPARK_CATALOG + ".glue.lakeformation-enabled"; + public static final String ICEBERG_SESSION_CATALOG = "org.apache.iceberg.spark.SparkSessionCatalog"; public static final String ICEBERG_SPARK_EXTENSION = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; public static final String ICEBERG_SPARK_RUNTIME_PACKAGE = "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"; - public static final String SPARK_CATALOG_CATALOG_IMPL = - "spark.sql.catalog.spark_catalog.catalog-impl"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY = + "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"; + public static final String ICEBERG_LF_CLIENT_FACTORY = + "org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory"; - public static final String EMR_LAKEFORMATION_OPTION = - "spark.emr-serverless.lakeformation.enabled"; public static final String FLINT_ACCELERATE_USING_COVERING_INDEX = "spark.flint.optimizer.covering.enabled"; } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java index 3fe7d99373..d9d5859f64 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java @@ -27,20 +27,13 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; @@ -71,7 +64,6 @@ private void setDefaultConfigs() { setConfigItem( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); setConfigItem( SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE); @@ -85,12 +77,8 @@ private void setDefaultConfigs() { setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - setConfigItem( - SPARK_SQL_EXTENSIONS_KEY, - ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); - setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); } private void setConfigItem(String key, String value) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index a7a79c758e..294a508cf0 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -203,39 +203,6 @@ void testDispatchSelectQuery() { verifyNoInteractions(flintIndexMetadataService); } - @Test - void testDispatchSelectQueryWithLakeFormation() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); - } - @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); @@ -1080,7 +1047,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String + getConfParam( "spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", "spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory", - "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", "spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT", "spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots", "spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/", @@ -1092,10 +1058,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String "spark.datasource.flint.scheme=SCHEMA", "spark.datasource.flint.auth=basic", "spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", - "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", - "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", - "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", - "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog") + "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") + getConfParam("spark.flint.job.query=" + query) + (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "") + getConfParam( @@ -1144,25 +1108,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { .build(); } - private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() { - - Map properties = new HashMap<>(); - properties.put("glue.auth.type", "iam_role"); - properties.put( - "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); - properties.put( - "glue.indexstore.opensearch.uri", - "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); - properties.put("glue.indexstore.opensearch.auth", "awssigv4"); - properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - properties.put("glue.lakeformation.enabled", "true"); - return new DataSourceMetadata.Builder() - .setName(MY_GLUE) - .setConnector(DataSourceType.S3GLUE) - .setProperties(properties) - .build(); - } - private DataSourceMetadata constructPrometheusDataSourceType() { return new DataSourceMetadata.Builder() .setName("my_prometheus") diff --git a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java index 26dbf3529a..6eb0ca5c25 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java @@ -5,15 +5,16 @@ package org.opensearch.sql.spark.parameter; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY; @@ -25,19 +26,49 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import com.amazonaws.arn.Arn; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +@RequiredArgsConstructor public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer { public static final String FLINT_BASIC_AUTH = "basic"; + public static final String FALSE = "false"; + public static final String TRUE = "true"; + + private final SparkExecutionEngineConfigClusterSettingLoader settingLoader; @Override public void compose( @@ -45,7 +76,16 @@ public void compose( SparkSubmitParameters params, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { - String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final Optional maybeClusterSettings = + settingLoader.load(); + if (!maybeClusterSettings.isPresent()) { + throw new RuntimeException("No cluster settings present"); + } + final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get(); + final String region = clusterSetting.getRegion(); + + final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final String accountId = Arn.fromString(roleArn).getAccountId(); params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); @@ -53,11 +93,37 @@ public void compose( params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName()); - final boolean lakeFormationEnabled = - BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); - params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled)); - params.setConfigItem( - FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled)); + final boolean icebergEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED)); + if (icebergEnabled) { + params.setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); + params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); + params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); + params.setConfigItem( + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + + params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region); + params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region); + + final boolean lakeFormationEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); + if (lakeFormationEnabled) { + final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG); + if (StringUtils.isBlank(sessionTag)) { + throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required"); + } + + params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE); + params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE); + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY); + params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag); + } else { + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY); + } + } setFlintIndexStoreHost( params, diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 05f7d1095c..7dfcad3db5 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -159,7 +159,8 @@ public FlintIndexStateModelService flintIndexStateModelService( public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider( Settings settings, SparkExecutionEngineConfigClusterSettingLoader clusterSettingLoader) { SparkParameterComposerCollection collection = new SparkParameterComposerCollection(); - collection.register(DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + collection.register( + DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index d8e3b80175..1f22be7961 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -18,12 +18,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.net.URL; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -42,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; @@ -58,6 +64,7 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider; import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; @@ -100,6 +107,10 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { public static final String MYS3_DATASOURCE = "mys3"; public static final String MYGLUE_DATASOURCE = "my_glue"; + public static final String ACCOUNT_ID = "accountId"; + public static final String APPLICATION_ID = "appId"; + public static final String REGION = "us-west-2"; + public static final String ROLE_ARN = "roleArn"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -262,7 +273,9 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( SparkParameterComposerCollection sparkParameterComposerCollection = new SparkParameterComposerCollection(); sparkParameterComposerCollection.register( - DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + DataSourceType.S3GLUE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider = new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection); QueryHandlerFactory queryHandlerFactory = @@ -372,14 +385,56 @@ public EMRServerlessClient getClient(String accountId) { public SparkExecutionEngineConfig sparkExecutionEngineConfig( AsyncQueryRequestContext asyncQueryRequestContext) { return SparkExecutionEngineConfig.builder() - .applicationId("appId") - .region("us-west-2") - .executionRoleARN("roleArn") + .applicationId(APPLICATION_ID) + .region(REGION) + .executionRoleARN(ROLE_ARN) .sparkSubmitParameterModifier(new OpenSearchSparkSubmitParameterModifier("")) .clusterName("myCluster") .build(); } + public static class TestSettings extends org.opensearch.sql.common.setting.Settings { + final Map values; + + public TestSettings() { + values = new HashMap<>(); + } + + /** Get Setting Value. */ + @Override + public T getSettingValue(Key key) { + return (T) values.get(key); + } + + @Override + public List getSettings() { + return values.keySet().stream().map(Key::getKeyValue).collect(Collectors.toList()); + } + + public void putSettingValue(Key key, T value) { + values.put(key, value); + } + } + + public SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APPLICATION_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final TestSettings settings = new TestSettings(); + settings.putSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG, jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + public void enableSession(boolean enabled) { // doNothing } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java index 55e62d52f0..24e94fd265 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java @@ -9,19 +9,25 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @ExtendWith(MockitoExtension.class) @@ -33,18 +39,20 @@ class S3GlueDataSourceSparkParameterComposerTest { public static final String PASSWORD = "PASSWORD"; public static final String REGION = "REGION"; public static final String TRUE = "true"; - public static final String ROLE_ARN = "ROLE_ARN"; + public static final String ROLE_ARN = "arn:aws:iam::123456789012:role/ROLE_NAME"; + public static final String APP_ID = "APP_ID"; + public static final String CLUSTER_NAME = "CLUSTER_NAME"; + public static final String ACCOUNT_ID = "123456789012"; + public static final String SESSION_TAG = "SESSION_TAG"; private static final String COMMON_EXPECTED_PARAMS = " --class org.apache.spark.sql.FlintJob " + getConfList( - "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.hive.metastore.glue.role.arn=ROLE_ARN", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", "spark.flint.datasource.name=DATASOURCE_NAME", - "spark.emr-serverless.lakeformation.enabled=true", - "spark.flint.optimizer.covering.enabled=false", "spark.datasource.flint.host=test.host.com", "spark.datasource.flint.port=9200", "spark.datasource.flint.scheme=https"); @@ -57,7 +65,7 @@ public void testBasicAuth() { getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -79,7 +87,7 @@ public void testComposeWithSigV4Auth() { getDataSourceMetadata(AuthenticationType.AWSSIGV4AUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -99,7 +107,7 @@ public void testComposeWithNoAuth() { getDataSourceMetadata(AuthenticationType.NOAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -120,7 +128,8 @@ public void testComposeWithBadUri() { assertThrows( IllegalArgumentException.class, () -> - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -128,6 +137,170 @@ public void testComposeWithBadUri() { new NullAsyncQueryRequestContext())); } + @Test + public void testIcebergEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG, SESSION_TAG) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.flint.optimizer.covering.enabled=false", + "spark.sql.catalog.spark_catalog.glue.lakeformation-enabled=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory", + "spark.sql.catalog.spark_catalog.client.assume-role.tags.LakeFormationAuthorizedCaller=SESSION_TAG", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabledNoSessionTag() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()); + assertThrows( + IllegalArgumentException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + + @Test + public void testNoClusterConfigAvailable() { + DataSourceMetadata dataSourceMetadata = + getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)).thenReturn(null); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + new SparkExecutionEngineConfigClusterSettingLoader(settings)); + + assertThrows( + RuntimeException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + private DataSourceMetadata getDataSourceMetadata( AuthenticationType authenticationType, String uri) { return new DataSourceMetadata.Builder() @@ -140,10 +313,20 @@ private DataSourceMetadata getDataSourceMetadata( .build(); } + private DataSourceMetadata getDataSourceMetadata(Map properties) { + return new DataSourceMetadata.Builder() + .setConnector(DataSourceType.S3GLUE) + .setName("DATASOURCE_NAME") + .setDescription("DESCRIPTION") + .setResultIndex("RESULT_INDEX") + .setDataSourceStatus(DataSourceStatus.ACTIVE) + .setProperties(properties) + .build(); + } + private Map getProperties(AuthenticationType authType, String uri) { return ImmutableMap.builder() .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) - .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, uri) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, authType.getName()) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) @@ -152,6 +335,26 @@ private Map getProperties(AuthenticationType authType, String ur .build(); } + private SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APP_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)) + .thenReturn(jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + private static String getConfList(String... params) { return Arrays.stream(params) .map(param -> String.format(" --conf %s ", param)) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index e0c13ff005..11a33a2969 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -29,7 +31,9 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_ICEBERG_ENABLED = "glue.iceberg.enabled"; public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; + public static final String GLUE_LAKEFORMATION_SESSION_TAG = "glue.lakeformation.session_tag"; @Override public DataSourceType getDataSourceType() { @@ -76,5 +80,18 @@ private void validateGlueDataSourceConfiguration(Map dataSourceM DatasourceValidationUtils.validateHost( dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI), pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + + // validate Lake Formation config + if (BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_ENABLED))) { + if (!BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + "Lake Formation can only be enabled when Iceberg is enabled."); + } + + if (StringUtils.isBlank(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + "Lake Formation session tag must be specified when enabling Lake Formation"); + } + } } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index 52f8ec9cd1..2833717265 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -210,4 +210,67 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { Assertions.assertEquals( "Invalid flint host in properties.", illegalArgumentException.getMessage()); } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoIceberg() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "false"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation can only be enabled when Iceberg is enabled.", + illegalArgumentException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation session tag must be specified when enabling Lake Formation", + illegalArgumentException.getMessage()); + } } diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 5e91df70e5..81946aa489 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -42,7 +42,9 @@ Glue Connector Properties. * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] -* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified +* ``glue.iceberg.enabled`` determines whether to enable Iceberg for the session. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.enabled`` determines whether to enable Lake Formation for queries when Iceberg is also enabled. If Iceberg is not enabled, then this property has no effect. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.session_tag`` what session tag to use when assuming the data source role. This property is required when both Iceberg and Lake Formation are enabled. Sample Glue dataSource configuration ========================================