From bd1f9d030caa3d085d8e629154711342f1614409 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 2 Oct 2023 18:35:20 -0700 Subject: [PATCH] Refactor Flint Auth Signed-off-by: Vamsi Manohar --- .../datasources/auth/AuthenticationType.java | 1 + .../glue/GlueDataSourceFactory.java | 33 ++- .../OpenSearchDataSourceMetadataStorage.java | 22 +- .../glue/GlueDataSourceFactoryTest.java | 94 +++++- .../ppl/admin/connectors/s3glue_connector.rst | 23 +- .../storage/PrometheusStorageFactory.java | 1 - .../model/S3GlueSparkSubmitParameters.java | 3 - .../spark/data/constants/SparkConstants.java | 6 +- .../dispatcher/SparkQueryDispatcher.java | 34 ++- .../dispatcher/SparkQueryDispatcherTest.java | 273 ++++++++++++++++-- 10 files changed, 424 insertions(+), 66 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/auth/AuthenticationType.java b/datasources/src/main/java/org/opensearch/sql/datasources/auth/AuthenticationType.java index b6581608bf..da789388f5 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/auth/AuthenticationType.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/auth/AuthenticationType.java @@ -12,6 +12,7 @@ import java.util.Map; public enum AuthenticationType { + NOAUTH("noauth"), BASICAUTH("basicauth"), AWSSIGV4AUTH("awssigv4"); diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java index 24f94376bf..0d2dc94bd4 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactory.java @@ -9,6 +9,7 @@ import org.opensearch.sql.datasource.model.DataSource; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.utils.DatasourceValidationUtils; import org.opensearch.sql.storage.DataSourceFactory; @@ -20,9 +21,14 @@ public class GlueDataSourceFactory implements DataSourceFactory { // Glue configuration properties public static final String GLUE_AUTH_TYPE = "glue.auth.type"; public static final String GLUE_ROLE_ARN = "glue.auth.role_arn"; - public static final String FLINT_URI = "glue.indexstore.opensearch.uri"; - public static final String FLINT_AUTH = "glue.indexstore.opensearch.auth"; - public static final String FLINT_REGION = "glue.indexstore.opensearch.region"; + public static final String GLUE_INDEX_STORE_OPENSEARCH_URI = "glue.indexstore.opensearch.uri"; + public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH = "glue.indexstore.opensearch.auth"; + public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME = + "glue.indexstore.opensearch.auth.username"; + public static final String GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD = + "glue.indexstore.opensearch.auth.password"; + public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION = + "glue.indexstore.opensearch.region"; @Override public DataSourceType getDataSourceType() { @@ -46,11 +52,28 @@ public DataSource createDataSource(DataSourceMetadata metadata) { private void validateGlueDataSourceConfiguration(Map dataSourceMetadataConfig) throws URISyntaxException, UnknownHostException { + DatasourceValidationUtils.validateLengthAndRequiredFields( dataSourceMetadataConfig, - Set.of(GLUE_AUTH_TYPE, GLUE_ROLE_ARN, FLINT_URI, FLINT_REGION, FLINT_AUTH)); + Set.of( + GLUE_AUTH_TYPE, + GLUE_ROLE_ARN, + GLUE_INDEX_STORE_OPENSEARCH_URI, + GLUE_INDEX_STORE_OPENSEARCH_AUTH)); + AuthenticationType authenticationType = + AuthenticationType.get(dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_AUTH)); + if (AuthenticationType.BASICAUTH.equals(authenticationType)) { + DatasourceValidationUtils.validateLengthAndRequiredFields( + dataSourceMetadataConfig, + Set.of( + GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME, + GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD)); + } else if (AuthenticationType.AWSSIGV4AUTH.equals(authenticationType)) { + DatasourceValidationUtils.validateLengthAndRequiredFields( + dataSourceMetadataConfig, Set.of(GLUE_INDEX_STORE_OPENSEARCH_REGION)); + } DatasourceValidationUtils.validateHost( - dataSourceMetadataConfig.get(FLINT_URI), + dataSourceMetadataConfig.get(GLUE_INDEX_STORE_OPENSEARCH_URI), pluginSettings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)); } } diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java index 5f5e087ce0..b9c0325a94 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/storage/OpenSearchDataSourceMetadataStorage.java @@ -43,7 +43,6 @@ import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.encryptor.Encryptor; import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.datasources.service.DataSourceMetadataStorage; @@ -252,26 +251,13 @@ private List searchInDataSourcesIndex(QueryBuilder query) { } } - @SuppressWarnings("missingswitchdefault") + // Encrypt and Decrypt irrespective of auth type.If properties name ends in username, password, + // secret_key and access_key. private DataSourceMetadata encryptDecryptAuthenticationData( DataSourceMetadata dataSourceMetadata, Boolean isEncryption) { Map propertiesMap = dataSourceMetadata.getProperties(); - Optional authTypeOptional = - propertiesMap.keySet().stream() - .filter(s -> s.endsWith("auth.type")) - .findFirst() - .map(propertiesMap::get) - .map(AuthenticationType::get); - if (authTypeOptional.isPresent()) { - switch (authTypeOptional.get()) { - case BASICAUTH: - handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption); - break; - case AWSSIGV4AUTH: - handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption); - break; - } - } + handleBasicAuthPropertiesEncryptionDecryption(propertiesMap, isEncryption); + handleSigV4PropertiesEncryptionDecryption(propertiesMap, isEncryption); return dataSourceMetadata; } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java index b018e5f9dc..4dd054de70 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/glue/GlueDataSourceFactoryTest.java @@ -40,7 +40,7 @@ void testCreateGLueDatSource() { properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); - properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); metadata.setName("my_glue"); @@ -59,6 +59,94 @@ void testCreateGLueDatSource() { "Glue storage engine is not supported.", unsupportedOperationException.getMessage()); } + @Test + @SneakyThrows + void testCreateGLueDatSourceWithBasicAuthForIndexStore() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "basicauth"); + properties.put("glue.indexstore.opensearch.auth.username", "username"); + properties.put("glue.indexstore.opensearch.auth.password", "password"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); + Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + dataSource + .getStorageEngine() + .getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs")); + Assertions.assertEquals( + "Glue storage engine is not supported.", unsupportedOperationException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGLueDatSourceWithAwsSigV4AuthForIndexStore() { + when(settings.getSettingValue(Settings.Key.DATASOURCES_URI_HOSTS_DENY_LIST)) + .thenReturn(Collections.emptyList()); + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "awssigv4"); + properties.put("glue.indexstore.opensearch.region", "us-west-2"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + DataSource dataSource = glueDatasourceFactory.createDataSource(metadata); + Assertions.assertEquals(DataSourceType.S3GLUE, dataSource.getConnectorType()); + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + dataSource + .getStorageEngine() + .getTable(new DataSourceSchemaName("my_glue", "default"), "alb_logs")); + Assertions.assertEquals( + "Glue storage engine is not supported.", unsupportedOperationException.getMessage()); + } + + @Test + @SneakyThrows + void testCreateGLueDatSourceWithBasicAuthForIndexStoreAndMissingFields() { + GlueDataSourceFactory glueDatasourceFactory = new GlueDataSourceFactory(settings); + + DataSourceMetadata metadata = new DataSourceMetadata(); + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "basicauth"); + + metadata.setName("my_glue"); + metadata.setConnector(DataSourceType.S3GLUE); + metadata.setProperties(properties); + IllegalArgumentException illegalArgumentException = + Assertions.assertThrows( + IllegalArgumentException.class, () -> glueDatasourceFactory.createDataSource(metadata)); + Assertions.assertEquals( + "Missing [glue.indexstore.opensearch.auth.password," + + " glue.indexstore.opensearch.auth.username] fields in the connector properties.", + illegalArgumentException.getMessage()); + } + @Test @SneakyThrows void testCreateGLueDatSourceWithInvalidFlintHost() { @@ -71,7 +159,7 @@ void testCreateGLueDatSourceWithInvalidFlintHost() { properties.put("glue.auth.type", "iam_role"); properties.put("glue.auth.role_arn", "role_arn"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); - properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); metadata.setName("my_glue"); @@ -100,7 +188,7 @@ void testCreateGLueDatSourceWithInvalidFlintHostSyntax() { properties.put( "glue.indexstore.opensearch.uri", "http://dummyprometheus.com:9090? paramt::localhost:9200"); - properties.put("glue.indexstore.opensearch.auth", "false"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); properties.put("glue.indexstore.opensearch.region", "us-west-2"); metadata.setName("my_glue"); diff --git a/docs/user/ppl/admin/connectors/s3glue_connector.rst b/docs/user/ppl/admin/connectors/s3glue_connector.rst index 640eb90283..ef27cf572a 100644 --- a/docs/user/ppl/admin/connectors/s3glue_connector.rst +++ b/docs/user/ppl/admin/connectors/s3glue_connector.rst @@ -39,9 +39,10 @@ Glue Connector Properties. * This parameters provides the Opensearch domain host information for glue connector. This opensearch instance is used for writing index data back and also * ``glue.indexstore.opensearch.uri`` [Required] * ``glue.indexstore.opensearch.auth`` [Required] - * Default value for auth is ``false``. - * ``glue.indexstore.opensearch.region`` [Required] - * Default value for auth is ``us-west-2``. + * Accepted values include ["noauth", "basicauth", "awssigv4"] + * Basic Auth required ``glue.indexstore.opensearch.auth.username`` and ``glue.indexstore.opensearch.auth.password`` + * AWSSigV4 Auth requires ``glue.indexstore.opensearch.auth.region`` and ``glue.auth.role_arn`` + * ``glue.indexstore.opensearch.region`` [Required for awssigv4 auth] Sample Glue dataSource configuration ======================================== @@ -55,11 +56,23 @@ Glue datasource configuration:: "glue.auth.type": "iam_role", "glue.auth.role_arn": "role_arn", "glue.indexstore.opensearch.uri": "http://localhost:9200", - "glue.indexstore.opensearch.auth" :"false", - "glue.indexstore.opensearch.region": "us-west-2" + "glue.indexstore.opensearch.auth" :"basicauth", + "glue.indexstore.opensearch.auth.username" :"username" + "glue.indexstore.opensearch.auth.password" :"password" } }] + [{ + "name" : "my_glue", + "connector": "s3glue", + "properties" : { + "glue.auth.type": "iam_role", + "glue.auth.role_arn": "role_arn", + "glue.indexstore.opensearch.uri": "http://adsasdf.amazonopensearch.com:9200", + "glue.indexstore.opensearch.auth" :"awssigv4", + "glue.indexstore.opensearch.auth.region" :"awssigv4", + } + }] Sample s3Glue datasource queries ================================ diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java index dd74f9d550..b9c1b09d60 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageFactory.java @@ -43,7 +43,6 @@ public class PrometheusStorageFactory implements DataSourceFactory { public static final String REGION = "prometheus.auth.region"; public static final String ACCESS_KEY = "prometheus.auth.access_key"; public static final String SECRET_KEY = "prometheus.auth.secret_key"; - private static final Integer MAX_LENGTH_FOR_CONFIG_PROPERTY = 1000; private final Settings settings; diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java index fadb8a67a9..13e4947eae 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java @@ -17,10 +17,8 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; @@ -69,7 +67,6 @@ public S3GlueSparkSubmitParameters() { this.config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); this.config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); this.config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); - this.config.put(FLINT_INDEX_STORE_AWSREGION_KEY, FLINT_DEFAULT_REGION); this.config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); this.config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); this.config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 21db8b9478..a0318a9478 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -24,7 +24,7 @@ public class SparkConstants { public static final String FLINT_DEFAULT_HOST = "localhost"; public static final String FLINT_DEFAULT_PORT = "9200"; public static final String FLINT_DEFAULT_SCHEME = "http"; - public static final String FLINT_DEFAULT_AUTH = "-1"; + public static final String FLINT_DEFAULT_AUTH = "noauth"; public static final String FLINT_DEFAULT_REGION = "us-west-2"; public static final String DEFAULT_CLASS_NAME = "org.opensearch.sql.FlintJob"; public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY = @@ -46,6 +46,10 @@ public class SparkConstants { public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port"; public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme"; public static final String FLINT_INDEX_STORE_AUTH_KEY = "spark.datasource.flint.auth"; + public static final String FLINT_INDEX_STORE_AUTH_USERNAME = + "spark.datasource.flint.auth.username"; + public static final String FLINT_INDEX_STORE_AUTH_PASSWORD = + "spark.datasource.flint.auth.password"; public static final String FLINT_INDEX_STORE_AWSREGION_KEY = "spark.datasource.flint.region"; public static final String FLINT_CREDENTIALS_PROVIDER_KEY = "spark.datasource.flint.customAWSCredentialsProvider"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 1d2064f3ca..ab9112460e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,10 +5,16 @@ package org.opensearch.sql.spark.dispatcher; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_PASSWORD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; @@ -27,6 +33,7 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; +import org.opensearch.sql.datasources.auth.AuthenticationType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.spark.asyncquery.model.S3GlueSparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; @@ -120,19 +127,38 @@ private String constructSparkParameters(String datasourceName) { String.format( "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); } - String auth = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.auth"); - String region = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.region"); s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); s3GlueSparkSubmitParameters.addParameter( FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, auth); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region); s3GlueSparkSubmitParameters.addParameter( "spark.sql.catalog." + datasourceName, FLINT_DELEGATE_CATALOG); + String auth = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH); + setFlintIndexStoreAuthProperties(dataSourceMetadata, s3GlueSparkSubmitParameters, auth); return s3GlueSparkSubmitParameters.toString(); } + private static void setFlintIndexStoreAuthProperties( + DataSourceMetadata dataSourceMetadata, + S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters, + String authType) { + if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType); + String username = + dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME); + String password = + dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD); + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_USERNAME, username); + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_PASSWORD, password); + } else if (AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) { + String region = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION); + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, "sigv4"); + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region); + } else { + s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, authType); + } + } + private StartJobRequest getStartJobRequestForNonIndexQueries( DispatchQueryRequest dispatchQueryRequest) { StartJobRequest startJobRequest; diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 6b8d75ab22..165c87c7aa 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -14,6 +14,9 @@ import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE; import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_PASSWORD; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_USERNAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; @@ -65,7 +68,13 @@ void testDispatchSelectQuery() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -87,7 +96,127 @@ void testDispatchSelectQuery() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), + tags)); + Assertions.assertEquals(EMR_JOB_ID, jobId); + } + + @Test + void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher( + emrServerlessClient, + dataSourceService, + dataSourceUserAuthorizationHelper, + jobExecutionResponseReader); + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + String query = "select * from my_glue.default.http_logs"; + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "basicauth", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); + put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); + } + }), + tags))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + String jobId = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "basicauth", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AUTH_USERNAME, "username"); + put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); + } + }), + tags)); + Assertions.assertEquals(EMR_JOB_ID, jobId); + } + + @Test + void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { + SparkQueryDispatcher sparkQueryDispatcher = + new SparkQueryDispatcher( + emrServerlessClient, + dataSourceService, + dataSourceUserAuthorizationHelper, + jobExecutionResponseReader); + HashMap tags = new HashMap<>(); + tags.put("datasource", "my_glue"); + tags.put("cluster", TEST_CLUSTER_NAME); + String query = "select * from my_glue.default.http_logs"; + when(emrServerlessClient.startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "noauth", + new HashMap<>() { + { + } + }), + tags))) + .thenReturn(EMR_JOB_ID); + DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithNoAuth(); + when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); + doNothing().when(dataSourceUserAuthorizationHelper).authorizeDataSource(dataSourceMetadata); + String jobId = + sparkQueryDispatcher.dispatch( + new DispatchQueryRequest( + EMRS_APPLICATION_ID, + query, + "my_glue", + LangType.SQL, + EMRS_EXECUTION_ROLE, + TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)) + .startJobRun( + new StartJobRequest( + query, + "TEST_CLUSTER:non-index-query", + EMRS_APPLICATION_ID, + EMRS_EXECUTION_ROLE, + constructExpectedSparkSubmitParameterString( + "noauth", + new HashMap<>() { + { + } + }), tags)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -115,7 +244,13 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -137,7 +272,13 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -160,7 +301,13 @@ void testDispatchWithPPLQuery() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -182,7 +329,13 @@ void testDispatchWithPPLQuery() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -205,7 +358,13 @@ void testDispatchQueryWithoutATableAndDataSourceName() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -227,7 +386,13 @@ void testDispatchQueryWithoutATableAndDataSourceName() { "TEST_CLUSTER:non-index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -255,7 +420,13 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); @@ -277,7 +448,13 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + }), tags)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -392,7 +569,16 @@ void testGetQueryResponseWithSuccess() { Assertions.assertEquals("SUCCESS", result.get("status")); } - private String constructExpectedSparkSubmitParameterString() { + private String constructExpectedSparkSubmitParameterString( + String auth, Map authParams) { + StringBuilder authParamConfigBuilder = new StringBuilder(); + for (String key : authParams.keySet()) { + authParamConfigBuilder.append(" --conf "); + authParamConfigBuilder.append(key); + authParamConfigBuilder.append("="); + authParamConfigBuilder.append(authParams.get(key)); + authParamConfigBuilder.append(" "); + } return " --class org.opensearch.sql.FlintJob --conf" + " spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider" + " --conf" @@ -409,19 +595,20 @@ private String constructExpectedSparkSubmitParameterString() { + " --conf" + " spark.datasource.flint.host=search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com" + " --conf spark.datasource.flint.port=-1 --conf" - + " spark.datasource.flint.scheme=https --conf spark.datasource.flint.auth=sigv4 " - + " --conf spark.datasource.flint.region=eu-west-1 --conf" - + " spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider" - + " --conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions " - + " --conf" - + " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" - + " --conf" - + " spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" - + " --conf" - + " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" - + " --conf" - + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" - + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog "; + + " spark.datasource.flint.scheme=https --conf spark.datasource.flint.auth=" + + auth + + " --conf" + + " spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider" + + " --conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions --conf" + + " spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + + " --conf" + + " spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + + " --conf" + + " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + + " --conf" + + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog " + + authParamConfigBuilder; } private DataSourceMetadata constructMyGlueDataSourceMetadata() { @@ -435,12 +622,46 @@ private DataSourceMetadata constructMyGlueDataSourceMetadata() { properties.put( "glue.indexstore.opensearch.uri", "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); - properties.put("glue.indexstore.opensearch.auth", "sigv4"); + properties.put("glue.indexstore.opensearch.auth", "awssigv4"); properties.put("glue.indexstore.opensearch.region", "eu-west-1"); dataSourceMetadata.setProperties(properties); return dataSourceMetadata; } + private DataSourceMetadata constructMyGlueDataSourceMetadataWithBasicAuth() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("my_glue"); + dataSourceMetadata.setConnector(DataSourceType.S3GLUE); + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put( + "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); + properties.put( + "glue.indexstore.opensearch.uri", + "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); + properties.put("glue.indexstore.opensearch.auth", "basicauth"); + properties.put("glue.indexstore.opensearch.auth.username", "username"); + properties.put("glue.indexstore.opensearch.auth.password", "password"); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } + + private DataSourceMetadata constructMyGlueDataSourceMetadataWithNoAuth() { + DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); + dataSourceMetadata.setName("my_glue"); + dataSourceMetadata.setConnector(DataSourceType.S3GLUE); + Map properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put( + "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); + properties.put( + "glue.indexstore.opensearch.uri", + "https://search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"); + properties.put("glue.indexstore.opensearch.auth", "noauth"); + dataSourceMetadata.setProperties(properties); + return dataSourceMetadata; + } + private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); dataSourceMetadata.setName("my_glue"); @@ -450,7 +671,7 @@ private DataSourceMetadata constructMyGlueDataSourceMetadataWithBadURISyntax() { properties.put( "glue.auth.role_arn", "arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"); properties.put("glue.indexstore.opensearch.uri", "http://localhost:9090? param"); - properties.put("glue.indexstore.opensearch.auth", "sigv4"); + properties.put("glue.indexstore.opensearch.auth", "awssigv4"); properties.put("glue.indexstore.opensearch.region", "eu-west-1"); dataSourceMetadata.setProperties(properties); return dataSourceMetadata;