diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 5b25bc175a..e87dbba03e 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -92,18 +92,35 @@ public class SparkConstants { public static final String FLINT_SESSION_CLASS_NAME = "org.apache.spark.sql.FlintREPL"; public static final String SPARK_CATALOG = "spark.sql.catalog.spark_catalog"; + public static final String SPARK_CATALOG_CATALOG_IMPL = SPARK_CATALOG + ".catalog-impl"; + public static final String SPARK_CATALOG_CLIENT_REGION = SPARK_CATALOG + ".client.region"; + public static final String SPARK_CATALOG_CLIENT_FACTORY = SPARK_CATALOG + ".client.factory"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN = + SPARK_CATALOG + ".client.assume-role.arn"; + public static final String SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION = + SPARK_CATALOG + ".client.assume-role.region"; + public static final String SPARK_CATALOG_LF_SESSION_TAG_KEY = + SPARK_CATALOG + ".client.assume-role.tags.LakeFormationAuthorizedCaller"; + public static final String SPARK_CATALOG_GLUE_ACCOUNT_ID = SPARK_CATALOG + ".glue.account-id"; + public static final String SPARK_CATALOG_GLUE_LF_ENABLED = + SPARK_CATALOG + ".glue.lakeformation-enabled"; + public static final String ICEBERG_SESSION_CATALOG = "org.apache.iceberg.spark.SparkSessionCatalog"; public static final String ICEBERG_SPARK_EXTENSION = "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; - public static final String ICEBERG_SPARK_RUNTIME_PACKAGE = - "/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"; - public static final String SPARK_CATALOG_CATALOG_IMPL = - "spark.sql.catalog.spark_catalog.catalog-impl"; + public static final String ICEBERG_SPARK_JARS = + "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30"; public static final String ICEBERG_GLUE_CATALOG = "org.apache.iceberg.aws.glue.GlueCatalog"; + public static final String ICEBERG_ASSUME_ROLE_CLIENT_FACTORY = + "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"; + public static final String ICEBERG_LF_CLIENT_FACTORY = + "org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory"; + // The following option is needed in Iceberg 1.5 when reading timestamp types that do not + // contain timezone in parquet files. The timezone is assumed to be GMT. + public static final String ICEBERG_TS_WO_TZ = + "spark.sql.iceberg.handle-timestamp-without-timezone"; - public static final String EMR_LAKEFORMATION_OPTION = - "spark.emr-serverless.lakeformation.enabled"; public static final String FLINT_ACCELERATE_USING_COVERING_INDEX = "spark.flint.optimizer.covering.enabled"; } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java index 2e142ed117..84fd49b712 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParameters.java @@ -30,6 +30,10 @@ public void deleteConfigItem(String key) { config.remove(key); } + public String getConfigItem(String key) { + return config.get(key); + } + @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java index 3fe7d99373..d9d5859f64 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilder.java @@ -27,20 +27,13 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_RUNTIME_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; import static org.opensearch.sql.spark.data.constants.SparkConstants.PPL_STANDALONE_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; @@ -71,7 +64,6 @@ private void setDefaultConfigs() { setConfigItem( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - setConfigItem(SPARK_JARS_KEY, ICEBERG_SPARK_RUNTIME_PACKAGE); setConfigItem( SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE); @@ -85,12 +77,8 @@ private void setDefaultConfigs() { setConfigItem(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); setConfigItem(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - setConfigItem( - SPARK_SQL_EXTENSIONS_KEY, - ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + setConfigItem(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); setConfigItem(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); - setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); } private void setConfigItem(String key, String value) { diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index ef7f117455..b6369292a6 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -203,39 +203,6 @@ void testDispatchSelectQuery() { verifyNoInteractions(flintIndexMetadataService); } - @Test - void testDispatchSelectQueryWithLakeFormation() { - when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); - HashMap tags = new HashMap<>(); - tags.put(DATASOURCE_TAG_KEY, MY_GLUE); - tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); - tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); - String query = "select * from my_glue.default.http_logs"; - String sparkSubmitParameters = constructExpectedSparkSubmitParameterString(query); - StartJobRequest expected = - new StartJobRequest( - "TEST_CLUSTER:batch", - null, - EMRS_APPLICATION_ID, - EMRS_EXECUTION_ROLE, - sparkSubmitParameters, - tags, - false, - "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); - DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithLakeFormation(); - when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( - MY_GLUE, asyncQueryRequestContext)) - .thenReturn(dataSourceMetadata); - - DispatchQueryResponse dispatchQueryResponse = - sparkQueryDispatcher.dispatch(getBaseDispatchQueryRequest(query), asyncQueryRequestContext); - verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); - Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); - Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); - verifyNoInteractions(flintIndexMetadataService); - } - @Test void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient); @@ -1090,7 +1057,6 @@ private String constructExpectedSparkSubmitParameterString(String query, String + getConfParam( "spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", "spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory", - "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar", "spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.3.0-SNAPSHOT,org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT", "spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots", "spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/", @@ -1102,10 +1068,8 @@ private String constructExpectedSparkSubmitParameterString(String query, String "spark.datasource.flint.scheme=SCHEMA", "spark.datasource.flint.auth=basic", "spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider", - "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", - "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory", - "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", - "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog") + "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") + getConfParam("spark.flint.job.query=" + query) + (jobType != null ? getConfParam("spark.flint.job.type=" + jobType) : "") + getConfParam( @@ -1154,25 +1118,6 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { .build(); } - private DataSourceMetadata constructMyGlueDataSourceMetadataWithLakeFormation() { - - Map properties = new HashMap<>(); - properties.put("glue.auth.type", "iam_role"); - properties.put( - "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); - properties.put( - "glue.indexstore.opensearch.uri", - "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); - properties.put("glue.indexstore.opensearch.auth", "awssigv4"); - properties.put("glue.indexstore.opensearch.region", "eu-west-1"); - properties.put("glue.lakeformation.enabled", "true"); - return new DataSourceMetadata.Builder() - .setName(MY_GLUE) - .setConnector(DataSourceType.S3GLUE) - .setProperties(properties) - .build(); - } - private DataSourceMetadata constructPrometheusDataSourceType() { return new DataSourceMetadata.Builder() .setName("my_prometheus") diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java index 8947cb61f7..8fb975d187 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/parameter/SparkSubmitParametersBuilderTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.parameter; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -93,6 +94,7 @@ public void testOverrideConfigItem() { params.setConfigItem(SPARK_JARS_KEY, "Overridden"); String result = params.toString(); + assertEquals("Overridden", params.getConfigItem(SPARK_JARS_KEY)); assertTrue(result.contains(String.format("%s=Overridden", SPARK_JARS_KEY))); } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java index 26dbf3529a..189e140416 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposer.java @@ -5,15 +5,16 @@ package org.opensearch.sql.spark.parameter; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ICEBERG_ENABLED; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_LAKEFORMATION_OPTION; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_ACCELERATE_USING_COVERING_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DATA_SOURCE_KEY; @@ -25,19 +26,50 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_PPL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_ASSUME_ROLE_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_GLUE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_LF_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SESSION_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_SPARK_JARS; +import static org.opensearch.sql.spark.data.constants.SparkConstants.ICEBERG_TS_WO_TZ; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CATALOG_IMPL; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_FACTORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_CLIENT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_ACCOUNT_ID; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_GLUE_LF_ENABLED; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_CATALOG_LF_SESSION_TAG_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import com.amazonaws.arn.Arn; import java.net.URI; import java.net.URISyntaxException; +import java.util.Optional; import java.util.function.Supplier; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSetting; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; +@RequiredArgsConstructor public class S3GlueDataSourceSparkParameterComposer implements DataSourceSparkParameterComposer { public static final String FLINT_BASIC_AUTH = "basic"; + public static final String FALSE = "false"; + public static final String TRUE = "true"; + + private final SparkExecutionEngineConfigClusterSettingLoader settingLoader; @Override public void compose( @@ -45,7 +77,16 @@ public void compose( SparkSubmitParameters params, DispatchQueryRequest dispatchQueryRequest, AsyncQueryRequestContext context) { - String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final Optional maybeClusterSettings = + settingLoader.load(); + if (!maybeClusterSettings.isPresent()) { + throw new RuntimeException("No cluster settings present"); + } + final SparkExecutionEngineConfigClusterSetting clusterSetting = maybeClusterSettings.get(); + final String region = clusterSetting.getRegion(); + + final String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); + final String accountId = Arn.fromString(roleArn).getAccountId(); params.setConfigItem(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); params.setConfigItem(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); @@ -53,11 +94,40 @@ public void compose( params.setConfigItem("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); params.setConfigItem(FLINT_DATA_SOURCE_KEY, metadata.getName()); - final boolean lakeFormationEnabled = - BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); - params.setConfigItem(EMR_LAKEFORMATION_OPTION, Boolean.toString(lakeFormationEnabled)); - params.setConfigItem( - FLINT_ACCELERATE_USING_COVERING_INDEX, Boolean.toString(!lakeFormationEnabled)); + final boolean icebergEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_ICEBERG_ENABLED)); + if (icebergEnabled) { + params.setConfigItem( + SPARK_JAR_PACKAGES_KEY, + params.getConfigItem(SPARK_JAR_PACKAGES_KEY) + "," + ICEBERG_SPARK_JARS); + params.setConfigItem(SPARK_CATALOG, ICEBERG_SESSION_CATALOG); + params.setConfigItem(SPARK_CATALOG_CATALOG_IMPL, ICEBERG_GLUE_CATALOG); + params.setConfigItem( + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSION + "," + FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION); + + params.setConfigItem(SPARK_CATALOG_CLIENT_REGION, region); + params.setConfigItem(SPARK_CATALOG_GLUE_ACCOUNT_ID, accountId); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_ARN, roleArn); + params.setConfigItem(SPARK_CATALOG_CLIENT_ASSUME_ROLE_REGION, region); + params.setConfigItem(ICEBERG_TS_WO_TZ, TRUE); + + final boolean lakeFormationEnabled = + BooleanUtils.toBoolean(metadata.getProperties().get(GLUE_LAKEFORMATION_ENABLED)); + if (lakeFormationEnabled) { + final String sessionTag = metadata.getProperties().get(GLUE_LAKEFORMATION_SESSION_TAG); + if (StringUtils.isBlank(sessionTag)) { + throw new IllegalArgumentException(GLUE_LAKEFORMATION_SESSION_TAG + " is required"); + } + + params.setConfigItem(FLINT_ACCELERATE_USING_COVERING_INDEX, FALSE); + params.setConfigItem(SPARK_CATALOG_GLUE_LF_ENABLED, TRUE); + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_LF_CLIENT_FACTORY); + params.setConfigItem(SPARK_CATALOG_LF_SESSION_TAG_KEY, sessionTag); + } else { + params.setConfigItem(SPARK_CATALOG_CLIENT_FACTORY, ICEBERG_ASSUME_ROLE_CLIENT_FACTORY); + } + } setFlintIndexStoreHost( params, diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 05f7d1095c..9cc69b2fb7 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -159,7 +159,11 @@ public FlintIndexStateModelService flintIndexStateModelService( public SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider( Settings settings, SparkExecutionEngineConfigClusterSettingLoader clusterSettingLoader) { SparkParameterComposerCollection collection = new SparkParameterComposerCollection(); - collection.register(DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + collection.register( + DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); + collection.register( + DataSourceType.SECURITY_LAKE, + new S3GlueDataSourceSparkParameterComposer(clusterSettingLoader)); collection.register(new OpenSearchExtraParameterComposer(clusterSettingLoader)); return new SparkSubmitParametersBuilderProvider(collection); } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index d8e3b80175..641b083d53 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -18,12 +18,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.net.URL; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -42,6 +47,7 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.plugins.Plugin; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; @@ -58,6 +64,7 @@ import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.config.OpenSearchSparkSubmitParameterModifier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider; import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; @@ -100,6 +107,10 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { public static final String MYS3_DATASOURCE = "mys3"; public static final String MYGLUE_DATASOURCE = "my_glue"; + public static final String ACCOUNT_ID = "accountId"; + public static final String APPLICATION_ID = "appId"; + public static final String REGION = "us-west-2"; + public static final String ROLE_ARN = "roleArn"; protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; @@ -262,7 +273,13 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( SparkParameterComposerCollection sparkParameterComposerCollection = new SparkParameterComposerCollection(); sparkParameterComposerCollection.register( - DataSourceType.S3GLUE, new S3GlueDataSourceSparkParameterComposer()); + DataSourceType.S3GLUE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); + sparkParameterComposerCollection.register( + DataSourceType.SECURITY_LAKE, + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader())); SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider = new SparkSubmitParametersBuilderProvider(sparkParameterComposerCollection); QueryHandlerFactory queryHandlerFactory = @@ -372,14 +389,56 @@ public EMRServerlessClient getClient(String accountId) { public SparkExecutionEngineConfig sparkExecutionEngineConfig( AsyncQueryRequestContext asyncQueryRequestContext) { return SparkExecutionEngineConfig.builder() - .applicationId("appId") - .region("us-west-2") - .executionRoleARN("roleArn") + .applicationId(APPLICATION_ID) + .region(REGION) + .executionRoleARN(ROLE_ARN) .sparkSubmitParameterModifier(new OpenSearchSparkSubmitParameterModifier("")) .clusterName("myCluster") .build(); } + public static class TestSettings extends org.opensearch.sql.common.setting.Settings { + final Map values; + + public TestSettings() { + values = new HashMap<>(); + } + + /** Get Setting Value. */ + @Override + public T getSettingValue(Key key) { + return (T) values.get(key); + } + + @Override + public List getSettings() { + return values.keySet().stream().map(Key::getKeyValue).collect(Collectors.toList()); + } + + public void putSettingValue(Key key, T value) { + values.put(key, value); + } + } + + public SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APPLICATION_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final TestSettings settings = new TestSettings(); + settings.putSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG, jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + public void enableSession(boolean enabled) { // doNothing } diff --git a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java index 55e62d52f0..3e12aa78d0 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/parameter/S3GlueDataSourceSparkParameterComposerTest.java @@ -7,21 +7,28 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import java.util.Arrays; import java.util.Map; import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings.Key; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceStatus; import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigClusterSettingLoader; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @ExtendWith(MockitoExtension.class) @@ -33,18 +40,20 @@ class S3GlueDataSourceSparkParameterComposerTest { public static final String PASSWORD = "PASSWORD"; public static final String REGION = "REGION"; public static final String TRUE = "true"; - public static final String ROLE_ARN = "ROLE_ARN"; + public static final String ROLE_ARN = "arn:aws:iam::123456789012:role/ROLE_NAME"; + public static final String APP_ID = "APP_ID"; + public static final String CLUSTER_NAME = "CLUSTER_NAME"; + public static final String ACCOUNT_ID = "123456789012"; + public static final String SESSION_TAG = "SESSION_TAG"; private static final String COMMON_EXPECTED_PARAMS = " --class org.apache.spark.sql.FlintJob " + getConfList( - "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=ROLE_ARN", - "spark.hive.metastore.glue.role.arn=ROLE_ARN", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", "spark.flint.datasource.name=DATASOURCE_NAME", - "spark.emr-serverless.lakeformation.enabled=true", - "spark.flint.optimizer.covering.enabled=false", "spark.datasource.flint.host=test.host.com", "spark.datasource.flint.port=9200", "spark.datasource.flint.scheme=https"); @@ -57,7 +66,7 @@ public void testBasicAuth() { getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -79,7 +88,7 @@ public void testComposeWithSigV4Auth() { getDataSourceMetadata(AuthenticationType.AWSSIGV4AUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -99,7 +108,7 @@ public void testComposeWithNoAuth() { getDataSourceMetadata(AuthenticationType.NOAUTH, VALID_URI); SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -120,7 +129,8 @@ public void testComposeWithBadUri() { assertThrows( IllegalArgumentException.class, () -> - new S3GlueDataSourceSparkParameterComposer() + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()) .compose( dataSourceMetadata, sparkSubmitParameters, @@ -128,6 +138,174 @@ public void testComposeWithBadUri() { new NullAsyncQueryRequestContext())); } + @Test + public void testIcebergEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.jars.packages=package,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.iceberg.handle-timestamp-without-timezone=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.AssumeRoleAwsClientFactory", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + sparkSubmitParameters.setConfigItem(SPARK_JAR_PACKAGES_KEY, "package"); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabled() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG, SESSION_TAG) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + final String expectedParams = + " --class org.apache.spark.sql.FlintJob " + + getConfList( + "spark.jars.packages=package,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.5.0,software.amazon.awssdk:bundle:2.26.30", + "spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.hive.metastore.glue.role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.DATASOURCE_NAME=org.opensearch.sql.FlintDelegatingSessionCatalog", + "spark.flint.datasource.name=DATASOURCE_NAME", + "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog", + "spark.sql.catalog.spark_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog", + "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.opensearch.flint.spark.FlintSparkExtensions,org.opensearch.flint.spark.FlintPPLSparkExtensions", + "spark.sql.catalog.spark_catalog.client.region=REGION", + "spark.sql.catalog.spark_catalog.glue.account-id=123456789012", + "spark.sql.catalog.spark_catalog.client.assume-role.arn=arn:aws:iam::123456789012:role/ROLE_NAME", + "spark.sql.catalog.spark_catalog.client.assume-role.region=REGION", + "spark.sql.iceberg.handle-timestamp-without-timezone=true", + "spark.flint.optimizer.covering.enabled=false", + "spark.sql.catalog.spark_catalog.glue.lakeformation-enabled=true", + "spark.sql.catalog.spark_catalog.client.factory=org.apache.iceberg.aws.lakeformation.LakeFormationAwsClientFactory", + "spark.sql.catalog.spark_catalog.client.assume-role.tags.LakeFormationAuthorizedCaller=SESSION_TAG", + "spark.datasource.flint.host=test.host.com", + "spark.datasource.flint.port=9200", + "spark.datasource.flint.scheme=https", + "spark.datasource.flint.auth=basic", + "spark.datasource.flint.auth.username=USERNAME", + "spark.datasource.flint.auth.password=PASSWORD"); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + sparkSubmitParameters.setConfigItem(SPARK_JAR_PACKAGES_KEY, "package"); + + new S3GlueDataSourceSparkParameterComposer(getSparkExecutionEngineConfigClusterSettingLoader()) + .compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext()); + + assertEquals(expectedParams, sparkSubmitParameters.toString()); + } + + @Test + public void testIcebergWithLakeFormationEnabledNoSessionTag() { + final Map properties = + ImmutableMap.builder() + .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) + .put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, VALID_URI) + .put( + GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, + AuthenticationType.BASICAUTH.getName()) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD, PASSWORD) + .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION, REGION) + .build(); + + DataSourceMetadata dataSourceMetadata = getDataSourceMetadata(properties); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + getSparkExecutionEngineConfigClusterSettingLoader()); + assertThrows( + IllegalArgumentException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + + @Test + public void testNoClusterConfigAvailable() { + DataSourceMetadata dataSourceMetadata = + getDataSourceMetadata(AuthenticationType.BASICAUTH, VALID_URI); + SparkSubmitParameters sparkSubmitParameters = new SparkSubmitParameters(); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)).thenReturn(null); + + final S3GlueDataSourceSparkParameterComposer composer = + new S3GlueDataSourceSparkParameterComposer( + new SparkExecutionEngineConfigClusterSettingLoader(settings)); + + assertThrows( + RuntimeException.class, + () -> + composer.compose( + dataSourceMetadata, + sparkSubmitParameters, + dispatchQueryRequest, + new NullAsyncQueryRequestContext())); + } + private DataSourceMetadata getDataSourceMetadata( AuthenticationType authenticationType, String uri) { return new DataSourceMetadata.Builder() @@ -140,10 +318,20 @@ private DataSourceMetadata getDataSourceMetadata( .build(); } + private DataSourceMetadata getDataSourceMetadata(Map properties) { + return new DataSourceMetadata.Builder() + .setConnector(DataSourceType.S3GLUE) + .setName("DATASOURCE_NAME") + .setDescription("DESCRIPTION") + .setResultIndex("RESULT_INDEX") + .setDataSourceStatus(DataSourceStatus.ACTIVE) + .setProperties(properties) + .build(); + } + private Map getProperties(AuthenticationType authType, String uri) { return ImmutableMap.builder() .put(GlueDataSourceFactory.GLUE_ROLE_ARN, ROLE_ARN) - .put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI, uri) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH, authType.getName()) .put(GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, USERNAME) @@ -152,6 +340,26 @@ private Map getProperties(AuthenticationType authType, String ur .build(); } + private SparkExecutionEngineConfigClusterSettingLoader + getSparkExecutionEngineConfigClusterSettingLoader() { + Gson gson = new Gson(); + JsonObject jsonObject = new JsonObject(); + jsonObject.addProperty("accountId", ACCOUNT_ID); + jsonObject.addProperty("applicationId", APP_ID); + jsonObject.addProperty("region", REGION); + jsonObject.addProperty("executionRoleARN", ROLE_ARN); + jsonObject.addProperty("sparkSubmitParameters", ""); + + // Convert JsonObject to JSON string + final String jsonString = gson.toJson(jsonObject); + + final OpenSearchSettings settings = Mockito.mock(OpenSearchSettings.class); + Mockito.when(settings.getSettingValue(Key.SPARK_EXECUTION_ENGINE_CONFIG)) + .thenReturn(jsonString); + + return new SparkExecutionEngineConfigClusterSettingLoader(settings); + } + private static String getConfList(String... params) { return Arrays.stream(params) .map(param -> String.format(" --conf %s ", param)) diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java index c727c3c531..c74964fc00 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java @@ -15,12 +15,13 @@ public class DataSourceType { public static DataSourceType OPENSEARCH = new DataSourceType("OPENSEARCH"); public static DataSourceType SPARK = new DataSourceType("SPARK"); public static DataSourceType S3GLUE = new DataSourceType("S3GLUE"); + public static DataSourceType SECURITY_LAKE = new DataSourceType("SECURITY_LAKE"); // Map from uppercase DataSourceType name to DataSourceType object private static Map knownValues = new HashMap<>(); static { - register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE); + register(PROMETHEUS, OPENSEARCH, SPARK, S3GLUE, SECURITY_LAKE); } private final String name; diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index e0c13ff005..11a33a2969 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Set; import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; @@ -29,7 +31,9 @@ public class GlueDataSourceFactory implements DataSourceFactory { "glue.indexstore.opensearch.auth.password"; public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_ICEBERG_ENABLED = "glue.iceberg.enabled"; public static final String GLUE_LAKEFORMATION_ENABLED = "glue.lakeformation.enabled"; + public static final String GLUE_LAKEFORMATION_SESSION_TAG = "glue.lakeformation.session_tag"; @Override public DataSourceType getDataSourceType() { @@ -76,5 +80,18 @@ private void validateGlueDataSourceConfiguration(Map dataSourceM DatasourceValidationUtils.validateHost( dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI), pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); + + // validate Lake Formation config + if (BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_ENABLED))) { + if (!BooleanUtils.toBoolean(dataSourceMetadataConfig.get(GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + "Lake Formation can only be enabled when Iceberg is enabled."); + } + + if (StringUtils.isBlank(dataSourceMetadataConfig.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + "Lake Formation session tag must be specified when enabling Lake Formation"); + } + } } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java new file mode 100644 index 0000000000..0f336a08d1 --- /dev/null +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/SecurityLakeDataSourceFactory.java @@ -0,0 +1,57 @@ +package org.opensearch.sql.datasources.glue; + +import java.util.Map; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +public class SecurityLakeDataSourceFactory extends GlueDataSourceFactory { + + private final Settings pluginSettings; + + public static final String TRUE = "true"; + + public SecurityLakeDataSourceFactory(final Settings pluginSettings) { + super(pluginSettings); + this.pluginSettings = pluginSettings; + } + + @Override + public DataSourceType getDataSourceType() { + return DataSourceType.SECURITY_LAKE; + } + + @Override + public DataSource createDataSource(DataSourceMetadata metadata) { + validateProperties(metadata.getProperties()); + metadata.getProperties().put(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED, TRUE); + metadata.getProperties().put(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED, TRUE); + return super.createDataSource(metadata); + } + + private void validateProperties(Map properties) { + // validate Lake Formation config + if (properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED) != null + && !BooleanUtils.toBoolean(properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED))) { + throw new IllegalArgumentException( + GlueDataSourceFactory.GLUE_ICEBERG_ENABLED + + " cannot be false when using Security Lake data source."); + } + + if (properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED) != null + && !BooleanUtils.toBoolean( + properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED))) { + throw new IllegalArgumentException( + GLUE_LAKEFORMATION_ENABLED + " cannot be false when using Security Lake data source."); + } + + if (StringUtils.isBlank(properties.get(GLUE_LAKEFORMATION_SESSION_TAG))) { + throw new IllegalArgumentException( + GlueDataSourceFactory.GLUE_LAKEFORMATION_SESSION_TAG + + " must be specified when using Security Lake data source"); + } + } +} diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index 52f8ec9cd1..2833717265 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -210,4 +210,67 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { Assertions.assertEquals( "Invalid flint host in properties.", illegalArgumentException.getMessage()); } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoIceberg() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "false"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation can only be enabled when Iceberg is enabled.", + illegalArgumentException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.enabled", "true"); + properties.put("glue.iceberg.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_glue") + .setConnector(DataSourceType.S3GLUE) + .setProperties(properties) + .build(); + + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Lake Formation session tag must be specified when enabling Lake Formation", + illegalArgumentException.getMessage()); + } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java new file mode 100644 index 0000000000..561d549826 --- /dev/null +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/SecurityLakeSourceFactoryTest.java @@ -0,0 +1,141 @@ +package org.opensearch.sql.datasources.glue; + +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.datasource.model.DataSource; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +@ExtendWith(MockitoExtension.class) +public class SecurityLakeSourceFactoryTest { + + @Mock private Settings settings; + + @Test + void testGetConnectorType() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + Assertions.assertEquals( + DataSourceType.SECURITY_LAKE, securityLakeDataSourceFactory.getDataSourceType()); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSource() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.lakeformation.session_tag", "session_tag"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + DataSource dataSource = securityLakeDataSourceFactory.createDataSource(metadata); + Assertions.assertEquals(DataSourceType.SECURITY_LAKE, dataSource.getConnectorType()); + + Assertions.assertEquals( + properties.get(GlueDataSourceFactory.GLUE_ICEBERG_ENABLED), + SecurityLakeDataSourceFactory.TRUE); + Assertions.assertEquals( + properties.get(GlueDataSourceFactory.GLUE_LAKEFORMATION_ENABLED), + SecurityLakeDataSourceFactory.TRUE); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSourceIcebergCannotBeDisabled() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "false"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } + + @Test + @SneakyThrows + void testCreateSecurityLakeDataSourceLakeFormationCannotBeDisabled() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "true"); + properties.put("glue.lakeformation.enabled", "false"); + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } + + @Test + @SneakyThrows + void testCreateGlueDataSourceWithLakeFormationNoSessionTags() { + SecurityLakeDataSourceFactory securityLakeDataSourceFactory = + new SecurityLakeDataSourceFactory(settings); + + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + properties.put("glue.iceberg.enabled", "true"); + properties.put("glue.lakeformation.enabled", "true"); + + DataSourceMetadata metadata = + new DataSourceMetadata.Builder() + .setName("my_sl") + .setConnector(DataSourceType.SECURITY_LAKE) + .setProperties(properties) + .build(); + + Assertions.assertThrows( + IllegalArgumentException.class, + () -> securityLakeDataSourceFactory.createDataSource(metadata)); + } +} diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 5e91df70e5..48f19a9d1e 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -42,7 +42,9 @@ Glue Connector Properties. * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] -* ``glue.lakeformation.enabled`` determines whether to enable lakeformation for queries. Default value is ``"false"`` if not specified +* ``glue.iceberg.enabled`` determines whether to enable Iceberg for the session. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.enabled`` determines whether to enable Lake Formation for queries when Iceberg is also enabled. If Iceberg is not enabled, then this property has no effect. Default value is ``"false"`` if not specified. +* ``glue.lakeformation.session_tag`` what session tag to use when assuming the data source role. This property is required when both Iceberg and Lake Formation are enabled. Sample Glue dataSource configuration ======================================== @@ -71,8 +73,7 @@ Glue datasource configuration:: "glue.auth.role_arn": "role_arn", "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", "glue.indexstore.opensearch.auth" :"awssigv4", - "glue.indexstore.opensearch.auth.region" :"awssigv4", - "glue.lakeformation.enabled": "true" + "glue.indexstore.opensearch.auth.region" :"us-east-1" }, "resultIndex": "query_execution_result" }] diff --git a/docs/user/ppl/admin/connectors/security_lake_connector.rst b/docs/user/ppl/admin/connectors/security_lake_connector.rst new file mode 100644 index 0000000000..6afddca131 --- /dev/null +++ b/docs/user/ppl/admin/connectors/security_lake_connector.rst @@ -0,0 +1,78 @@ +.. highlight:: sh + +==================== +Security Lake Connector +==================== + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 1 + + +Introduction +============ + +Security Lake connector provides a way to query Security Lake tables. + +Required resources for Security Lake Connector +======================================== +* ``EMRServerless Spark Execution Engine Config Setting``: Since we execute s3Glue queries on top of spark execution engine, we require this configuration. + More details: `ExecutionEngine Config <../../../interfaces/asyncqueryinterface.rst#id2>`_ +* ``S3``: This is where the data lies. +* ``Glue``: Metadata store: Glue takes care of table metadata. +* ``Lake Formation``: AWS service that performs authorization on Security Lake tables +* ``Security Lake``: AWS service that orchestrates creation of S3 files, Glue tables, and Lake Formation permissions. +* ``Opensearch IndexStore``: Index for s3 data lies in opensearch and also acts as temporary buffer for query results. + +We currently only support emr-serverless as spark execution engine and Glue as metadata store. we will add more support in future. + +Glue Connector Properties. + +* ``resultIndex`` is a new parameter specific to glue connector. Stores the results of queries executed on the data source. If unavailable, it defaults to .query_execution_result. +* ``glue.auth.type`` [Required] + * This parameters provides the authentication type information required for execution engine to connect to glue. + * S3 Glue connector currently only supports ``iam_role`` authentication and the below parameters is required. + * ``glue.auth.role_arn`` +* ``glue.indexstore.opensearch.*`` [Required] + * This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also + * ``glue.indexstore.opensearch.uri`` [Required] + * ``glue.indexstore.opensearch.auth`` [Required] + * Accepted values include ["noauth", "basicauth", "awssigv4"] + * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` + * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` + * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] +* ``glue.lakeformation.session_tag`` [Required] + * What session tag to use when assuming the data source role. + +Sample Glue dataSource configuration +======================================== + +Glue datasource configuration:: + + [{ + "name" : "my_sl", + "connector": "security_lake", + "properties" : { + "glue.auth.type": "iam_role", + "glue.auth.role_arn": "role_arn", + "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", + "glue.indexstore.opensearch.auth" :"awssigv4", + "glue.indexstore.opensearch.auth.region" :"us-east-1", + "glue.lakeformation.session_tag": "sesson_tag" + }, + "resultIndex": "query_execution_result" + }] + +Sample Security Lake datasource queries APIS +===================================== + +Sample Queries + +* Select Query : ``select * from mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 limit 1`` +* Create Covering Index Query: ``create index srcip_time on mysl.amazon_security_lake_glue_db_eu_west_1.amazon_security_lake_table_eu_west_1_vpc_flow_2_0 (src_endpoint.ip, time) WITH (auto_refresh=true)`` + +These queries would work only top of async queries. Documentation: `Async Query APIs <../../../interfaces/asyncqueryinterface.rst>`_ + +Documentation for Index Queries: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index a1b1e32955..971ef5e928 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -60,6 +60,7 @@ import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.datasources.encryptor.EncryptorImpl; import org.opensearch.sql.datasources.glue.GlueDataSourceFactory; +import org.opensearch.sql.datasources.glue.SecurityLakeDataSourceFactory; import org.opensearch.sql.datasources.model.transport.*; import org.opensearch.sql.datasources.rest.RestDataSourceQueryAction; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; @@ -326,6 +327,7 @@ private DataSourceServiceImpl createDataSourceService() { .add(new PrometheusStorageFactory(pluginSettings)) .add(new SparkStorageFactory(this.client, pluginSettings)) .add(new GlueDataSourceFactory(pluginSettings)) + .add(new SecurityLakeDataSourceFactory(pluginSettings)) .build(), dataSourceMetadataStorage, dataSourceUserAuthorizationHelper);