diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 5b25bc175a..db6d667eee 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -92,18 +92,31 @@ public class SparkConstants { public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL"; public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog"; + public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl"; + public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region"; + public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN = + SPARK_CATALOG + ".client.assume-role.arn"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION = + SPARK_CATALOG + ".client.assume-role.region"; + public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY = + SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller"; + public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id"; + public static final String SPARK_CATALOG_GLUE_LF_ENABLED = + SPARK_CATALOG + ".glue.lakeformation-enabled"; + public static final String ICEBERG_SESSION_CATALOG = "org.apache.iceberg.spark.SparkSessionCatalog"; public static final String ICEBERG_SPARK_EXTENSION = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; public static final String ICEBERG_SPARK_RUNTIME_PACKAGE = "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"; - public static final String SPARK_CATALOG_CATALOG_IMPL = - "spark.sql.catalog.spark_catalog.catalog-impl"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY = + "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"; + public static final String ICEBERG_LF_CLIENT_FACTORY = + "org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory"; - public static final String EMR_LAKEFORMATION_OPTION = - "spark.emr-serverless.lakeformation.enabled"; public static final String FLINT_ACCELERATE_USING_COVERING_INDEX = "spark.flint.optimizer.covering.enabled"; } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java index 3fe7d99373..d9d5859f64 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java @@ -27,20 +27,13 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; @@ -71,7 +64,6 @@ private void setDefaultConfigs() { setConfigItem( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); setConfigItem( SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE); @@ -85,12 +77,8 @@ private void setDefaultConfigs() { setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - setConfigItem( - SPARK_SQL_EXTENSIONS_KEY, - ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); - setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); } private void setConfigItem(String key, String value) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index a7a79c758e..294a508cf0 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -203,39 +203,6 @@ void testDispatchSelectQuery() { verifyNoInteractions(flintIndexMetadataService); } - @Test - void testDispatchSelectQueryWithLakeFormation() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); - } - @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); @@ -1080,7 +1047,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String + getConfParam( "spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", "spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory", - "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", "spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT", "spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots", "spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/", @@ -1092,10 +1058,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String "spark.datasource.flint.scheme=SCHEMA", "spark.datasource.flint.auth=basic", "spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", - "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", - "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", - "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", - "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog") + "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") + getConfParam("spark.flint.job.query=" + query) + (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "") + getConfParam( @@ -1144,25 +1108,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { .build(); } - private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() { - - Map properties = new HashMap<>(); - properties.put("glue.auth.type", "iam_role"); - properties.put( - "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); - properties.put( - "glue.indexstore.opensearch.uri", - "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); - properties.put("glue.indexstore.opensearch.auth", "awssigv4"); - properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - properties.put("glue.lakeformation.enabled", "true"); - return new DataSourceMetadata.Builder() - .setName(MY_GLUE) - .setConnector(DataSourceType.S3GLUE) - .setProperties(properties) - .build(); - } - private DataSourceMetadata constructPrometheusDataSourceType() { return new DataSourceMetadata.Builder() .setName("my_prometheus") diff --git a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java index 26dbf3529a..6eb0ca5c25 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java @@ -5,15 +5,16 @@ package org.opensearch.sql.spark.parameter; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY; @@ -25,19 +26,49 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import com.amazonaws.arn.Arn; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +@RequiredArgsConstructor public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer { public static final String FLINT_BASIC_AUTH = "basic"; + public static final String FALSE = "false"; + public static final String TRUE = "true"; + + private final SparkExecutionEngineConfigClusterSettingLoader settingLoader; @Override public void compose( @@ -45,7 +76,16 @@ public void compose( SparkSubmitParameters params, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { - String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final Optional maybeClusterSettings = + settingLoader.load(); + if (!maybeClusterSettings.isPresent()) { + throw new RuntimeException("No cluster settings present"); + } + final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get(); + final String region = clusterSetting.getRegion(); + + final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final String accountId = Arn.fromString(roleArn).getAccountId(); params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); @@ -53,11 +93,37 @@ public void compose( params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName()); - final boolean lakeFormationEnabled = - BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); - params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled)); - params.setConfigItem( - FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled)); + final boolean icebergEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED)); + if (icebergEnabled) { + params.setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); + params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); + params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); + params.setConfigItem( + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + + params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region); + params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region); + + final boolean lakeFormationEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); + if (lakeFormationEnabled) { + final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG); + if (StringUtils.isBlank(sessionTag)) { + throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required"); + } + + params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE); + params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE); + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY); + params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag); + } else { + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY); + } + } setFlintIndexStoreHost( params, diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 05f7d1095c..7dfcad3db5 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -159,7 +159,8 @@ public FlintIndexStateModelService flintIndexStateModelService( public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider( Settings settings, SparkExecutionEngineConfigClusterSettingLoader clusterSettingLoader) { SparkParameterComposerCollection collection = new SparkParameterComposerCollection(); - collection.register(DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + collection.register( + DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index d8e3b80175..1f22be7961 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -18,12 +18,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.net.URL; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -42,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; @@ -58,6 +64,7 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider; import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; @@ -100,6 +107,10 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { public static final String MYS3_DATASOURCE = "mys3"; public static final String MYGLUE_DATASOURCE = "my_glue"; + public static final String ACCOUNT_ID = "accountId"; + public static final String APPLICATION_ID = "appId"; + public static final String REGION = "us-west-2"; + public static final String ROLE_ARN = "roleArn"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -262,7 +273,9 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( SparkParameterComposerCollection sparkParameterComposerCollection = new SparkParameterComposerCollection(); sparkParameterComposerCollection.register( - DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + DataSourceType.S3GLUE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider = new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection); QueryHandlerFactory queryHandlerFactory = @@ -372,14 +385,56 @@ public EMRServerlessClient getClient(String accountId) { public SparkExecutionEngineConfig sparkExecutionEngineConfig( AsyncQueryRequestContext asyncQueryRequestContext) { return SparkExecutionEngineConfig.builder() - .applicationId("appId") - .region("us-west-2") - .executionRoleARN("roleArn") + .applicationId(APPLICATION_ID) + .region(REGION) + .executionRoleARN(ROLE_ARN) .sparkSubmitParameterModifier(new OpenSearchSparkSubmitParameterModifier("")) .clusterName("myCluster") .build(); } + public static class TestSettings extends org.opensearch.sql.common.setting.Settings { + final Map values; + + public TestSettings() { + values = new HashMap<>(); + } + + /** Get Setting Value. */ + @Override + public T getSettingValue(Key key) { + return (T) values.get(key); + } + + @Override + public List getSettings() { + return values.keySet().stream().map(Key::getKeyValue).collect(Collectors.toList()); + } + + public void putSettingValue(Key key, T value) { + values.put(key, value); + } + } + + public SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APPLICATION_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final TestSettings settings = new TestSettings(); + settings.putSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG, jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + public void enableSession(boolean enabled) { // doNothing } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java index 55e62d52f0..24e94fd265 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java @@ -9,19 +9,25 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @ExtendWith(MockitoExtension.class) @@ -33,18 +39,20 @@ class S3GlueDataSourceSparkParameterComposerTest { public static final String PASSWORD = "PASSWORD"; public static final String REGION = "REGION"; public static final String TRUE = "true"; - public static final String ROLE_ARN = "ROLE_ARN"; + public static final String ROLE_ARN = "arn:aws:iam::123456789012:role/ROLE_NAME"; + public static final String APP_ID = "APP_ID"; + public static final String CLUSTER_NAME = "CLUSTER_NAME"; + public static final String ACCOUNT_ID = "123456789012"; + public static final String SESSION_TAG = "SESSION_TAG"; private static final String COMMON_EXPECTED_PARAMS = " --class org.apache.spark.sql.FlintJob " + getConfList( - "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.hive.metastore.glue.role.arn=ROLE_ARN", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", "spark.flint.datasource.name=DATASOURCE_NAME", - "spark.emr-serverless.lakeformation.enabled=true", - "spark.flint.optimizer.covering.enabled=false", "spark.datasource.flint.host=test.host.com", "spark.datasource.flint.port=9200", "spark.datasource.flint.scheme=https"); @@ -57,7 +65,7 @@ public void testBasicAuth() { getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -79,7 +87,7 @@ public void testComposeWithSigV4Auth() { getDataSourceMetadata(AuthenticationType.AWSSIGV4AUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -99,7 +107,7 @@ public void testComposeWithNoAuth() { getDataSourceMetadata(AuthenticationType.NOAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -120,7 +128,8 @@ public void testComposeWithBadUri() { assertThrows( IllegalArgumentException.class, () -> - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -128,6 +137,170 @@ public void testComposeWithBadUri() { new NullAsyncQueryRequestContext())); } + @Test + public void testIcebergEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG, SESSION_TAG) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.flint.optimizer.covering.enabled=false", + "spark.sql.catalog.spark_catalog.glue.lakeformation-enabled=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory", + "spark.sql.catalog.spark_catalog.client.assume-role.tags.LakeFormationAuthorizedCaller=SESSION_TAG", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabledNoSessionTag() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()); + assertThrows( + IllegalArgumentException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + + @Test + public void testNoClusterConfigAvailable() { + DataSourceMetadata dataSourceMetadata = + getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)).thenReturn(null); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + new SparkExecutionEngineConfigClusterSettingLoader(settings)); + + assertThrows( + RuntimeException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + private DataSourceMetadata getDataSourceMetadata( AuthenticationType authenticationType, String uri) { return new DataSourceMetadata.Builder() @@ -140,10 +313,20 @@ private DataSourceMetadata getDataSourceMetadata( .build(); } + private DataSourceMetadata getDataSourceMetadata(Map properties) { + return new DataSourceMetadata.Builder() + .setConnector(DataSourceType.S3GLUE) + .setName("DATASOURCE_NAME") + .setDescription("DESCRIPTION") + .setResultIndex("RESULT_INDEX") + .setDataSourceStatus(DataSourceStatus.ACTIVE) + .setProperties(properties) + .build(); + } + private Map getProperties(AuthenticationType authType, String uri) { return ImmutableMap.builder() .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) - .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, uri) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, authType.getName()) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) @@ -152,6 +335,26 @@ private Map getProperties(AuthenticationType authType, String ur .build(); } + private SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APP_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)) + .thenReturn(jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + private static String getConfList(String... params) { return Arrays.stream(params) .map(param -> String.format(" --conf %s ", param)) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index e0c13ff005..11a33a2969 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -29,7 +31,9 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_ICEBERG_ENABLED = "glue.iceberg.enabled"; public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; + public static final String GLUE_LAKEFORMATION_SESSION_TAG = "glue.lakeformation.session_tag"; @Override public DataSourceType getDataSourceType() { @@ -76,5 +80,18 @@ private void validateGlueDataSourceConfiguration(Map dataSourceM DatasourceValidationUtils.validateHost( dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI), pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + + // validate Lake Formation config + if (BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_ENABLED))) { + if (!BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + "Lake Formation can only be enabled when Iceberg is enabled."); + } + + if (StringUtils.isBlank(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + "Lake Formation session tag must be specified when enabling Lake Formation"); + } + } } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index 52f8ec9cd1..2833717265 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -210,4 +210,67 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { Assertions.assertEquals( "Invalid flint host in properties.", illegalArgumentException.getMessage()); } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoIceberg() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "false"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation can only be enabled when Iceberg is enabled.", + illegalArgumentException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation session tag must be specified when enabling Lake Formation", + illegalArgumentException.getMessage()); + } } diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 5e91df70e5..81946aa489 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -42,7 +42,9 @@ Glue Connector Properties. * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] -* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified +* ``glue.iceberg.enabled`` determines whether to enable Iceberg for the session. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.enabled`` determines whether to enable Lake Formation for queries when Iceberg is also enabled. If Iceberg is not enabled, then this property has no effect. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.session_tag`` what session tag to use when assuming the data source role. This property is required when both Iceberg and Lake Formation are enabled. Sample Glue dataSource configuration ========================================