From 468eb067769c2768d8417c8faaecbf0b5de568b3 Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Thu, 6 Jun 2024 22:16:05 +0800 Subject: [PATCH] [#3463] improvement(lakehouse-iceberg): Support user authentication for Iceberg Hive backend. (#3724) ### What changes were proposed in this pull request? - Add user authentication for Iceberg with Hive backend using kerberos - Add e2e test. ### Why are the changes needed? It's a must-have feature for iceberg. Fix: #3463 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? ITs. --- LICENSE | 1 + .../catalog/hadoop/HadoopCatalog.java | 2 +- .../hadoop/HadoopCatalogOperations.java | 4 +- .../HadoopCatalogPropertiesMetadata.java | 4 +- .../AuthenticationConfig.java | 14 +- .../kerberos/FetchFileUtils.java | 2 +- .../kerberos/KerberosClient.java | 2 +- .../kerberos/KerberosConfig.java | 3 +- .../test/HadoopUserAuthenticationIT.java | 8 +- .../test/HadoopUserImpersonationIT.java | 8 +- .../hive/integration/test/CatalogHiveIT.java | 23 +- .../test/HiveUserAuthenticationIT.java | 1 - .../integration/test/ProxyCatalogHiveIT.java | 1 - .../build.gradle.kts | 3 + .../iceberg/IcebergCatalogOperations.java | 1 + .../IcebergCatalogPropertiesMetadata.java | 26 +- .../iceberg/IcebergHiveCachedClientPool.java | 229 ++++++++++++ .../authentication/AuthenticationConfig.java | 90 +++++ .../kerberos/FetchFileUtils.java | 51 +++ .../kerberos/HiveBackendProxy.java | 116 ++++++ .../kerberos/KerberosClient.java | 107 ++++++ .../kerberos/KerberosConfig.java | 125 +++++++ .../iceberg/utils/IcebergCatalogUtil.java | 49 ++- .../test/CatalogIcebergBaseIT.java | 15 +- .../test/CatalogIcebergKerberosHiveIT.java | 339 ++++++++++++++++++ .../gravitino/config/ConfigConstants.java | 2 + gradle/libs.versions.toml | 3 + .../integration/test/util/AbstractIT.java | 24 +- 28 files changed, 1196 insertions(+), 57 deletions(-) rename catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/{kerberos => authentication}/AuthenticationConfig.java (83%) rename catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/{ => authentication}/kerberos/FetchFileUtils.java (95%) rename catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/{ => authentication}/kerberos/KerberosClient.java (98%) rename catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/{ => authentication}/kerberos/KerberosConfig.java (96%) create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergHiveCachedClientPool.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/AuthenticationConfig.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/FetchFileUtils.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/HiveBackendProxy.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosClient.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosConfig.java create mode 100644 catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java diff --git a/LICENSE b/LICENSE index 5d958349f2f..d80ec880eca 100644 --- a/LICENSE +++ b/LICENSE @@ -235,6 +235,7 @@ ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/web/IcebergExceptionMapper.java ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/converter/DescribeIcebergSortOrderVisitor.java ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergTablePropertiesUtil.java + ./catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergHiveCachedClientPool.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/HTTPClient.java ./clients/client-java/src/main/java/com/datastrato/gravitino/client/RESTClient.java ./clients/client-java/src/test/java/com/datastrato/gravitino/client/TestHTTPClient.java diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java index d548b4e548c..f6ae1142032 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java @@ -4,7 +4,7 @@ */ package com.datastrato.gravitino.catalog.hadoop; -import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BaseCatalog; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.PropertiesMetadata; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 283b03c3b33..bf136d34edc 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -15,8 +15,8 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.StringIdentifier; -import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig; -import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosClient; +import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosClient; import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.HasPropertyMetadata; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java index 37a5700e57c..5f4e475dba3 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java @@ -4,8 +4,8 @@ */ package com.datastrato.gravitino.catalog.hadoop; -import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig; -import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; import com.google.common.collect.ImmutableMap; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java similarity index 83% rename from catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java rename to catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java index 8d9549c57b6..7655623462d 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/AuthenticationConfig.java @@ -3,11 +3,10 @@ * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.catalog.hadoop.kerberos; - -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.DEFAULT_IMPERSONATION_ENABLE; +package com.datastrato.gravitino.catalog.hadoop.authentication; import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.config.ConfigBuilder; import com.datastrato.gravitino.config.ConfigConstants; import com.datastrato.gravitino.config.ConfigEntry; @@ -29,17 +28,18 @@ public AuthenticationConfig(Map properties) { public static final ConfigEntry AUTH_TYPE_ENTRY = new ConfigBuilder(AUTH_TYPE_KEY) - .doc("The type of authentication for Hadoop catalog, currently we only support Kerberos") + .doc( + "The type of authentication for Hadoop catalog, currently we only support simple and Kerberos") .version(ConfigConstants.VERSION_0_5_1) .stringConf() - .create(); + .createWithDefault("simple"); public static final ConfigEntry ENABLE_IMPERSONATION_ENTRY = new ConfigBuilder(IMPERSONATION_ENABLE_KEY) .doc("Whether to enable impersonation for the Hadoop catalog") .version(ConfigConstants.VERSION_0_5_1) .booleanConf() - .createWithDefault(DEFAULT_IMPERSONATION_ENABLE); + .createWithDefault(KerberosConfig.DEFAULT_IMPERSONATION_ENABLE); public String getAuthType() { return get(AUTH_TYPE_ENTRY); @@ -58,7 +58,7 @@ public boolean isImpersonationEnabled() { "Whether to enable impersonation for the Hadoop catalog", false, true, - DEFAULT_IMPERSONATION_ENABLE, + KerberosConfig.DEFAULT_IMPERSONATION_ENABLE, false, false)) .put( diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/FetchFileUtils.java similarity index 95% rename from catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java rename to catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/FetchFileUtils.java index 30f75687f6e..24c55049158 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/FetchFileUtils.java @@ -2,7 +2,7 @@ * Copyright 2024 Datastrato Pvt Ltd. * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.catalog.hadoop.kerberos; +package com.datastrato.gravitino.catalog.hadoop.authentication.kerberos; import java.io.File; import java.io.IOException; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java similarity index 98% rename from catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java rename to catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java index b9aece8b87c..3930380e876 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosClient.java @@ -3,7 +3,7 @@ * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.catalog.hadoop.kerberos; +package com.datastrato.gravitino.catalog.hadoop.authentication.kerberos; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosConfig.java similarity index 96% rename from catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java rename to catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosConfig.java index 60f5b8469c5..aaec3f3c472 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/authentication/kerberos/KerberosConfig.java @@ -3,8 +3,9 @@ * This software is licensed under the Apache License version 2. */ -package com.datastrato.gravitino.catalog.hadoop.kerberos; +package com.datastrato.gravitino.catalog.hadoop.authentication.kerberos; +import com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig; import com.datastrato.gravitino.config.ConfigBuilder; import com.datastrato.gravitino.config.ConfigConstants; import com.datastrato.gravitino.config.ConfigEntry; diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java index 030283c34d9..e35902145b5 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserAuthenticationIT.java @@ -5,10 +5,10 @@ package com.datastrato.gravitino.catalog.hadoop.integration.test; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig.AUTH_TYPE_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.KEY_TAB_URI_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.PRINCIPAL_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig.AUTH_TYPE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KEY_TAB_URI_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.PRINCIPAL_KEY; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java index ead0eb57ab8..838bdc0c3fc 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java @@ -5,10 +5,10 @@ package com.datastrato.gravitino.catalog.hadoop.integration.test; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig.AUTH_TYPE_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.KEY_TAB_URI_KEY; -import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.PRINCIPAL_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.AuthenticationConfig.AUTH_TYPE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KEY_TAB_URI_KEY; +import static com.datastrato.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.PRINCIPAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; import com.datastrato.gravitino.Catalog; diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java index e76fbd65ea3..d45c5c7c36f 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java @@ -199,18 +199,18 @@ public static void startup() throws Exception { @AfterAll public static void stop() throws IOException { - Arrays.stream(catalog.asSchemas().listSchemas()) - .filter(schema -> !schema.equals("default")) - .forEach( - (schema -> { - catalog.asSchemas().dropSchema(schema, true); - })); - Arrays.stream(metalake.listCatalogs()) - .forEach( - (catalogName -> { - metalake.dropCatalog(catalogName); - })); if (client != null) { + Arrays.stream(catalog.asSchemas().listSchemas()) + .filter(schema -> !schema.equals("default")) + .forEach( + (schema -> { + catalog.asSchemas().dropSchema(schema, true); + })); + Arrays.stream(metalake.listCatalogs()) + .forEach( + (catalogName -> { + metalake.dropCatalog(catalogName); + })); client.dropMetalake(metalakeName); } if (hiveClientPool != null) { @@ -230,7 +230,6 @@ public static void stop() throws IOException { LOG.error("Failed to close CloseableGroup", e); } - AbstractIT.customConfigs.clear(); AbstractIT.client = null; } diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java index 4880e3a7da6..5d2ee86a5af 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java @@ -117,7 +117,6 @@ public static void stop() { System.clearProperty("java.security.krb5.conf"); System.clearProperty("sun.security.krb5.debug"); - AbstractIT.customConfigs.clear(); AbstractIT.client = null; } diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java index fee4b49df3f..8e1f879b088 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java @@ -119,7 +119,6 @@ public static void stop() { setEnv(HADOOP_USER_NAME, originHadoopUser); anotherClient.close(); - AbstractIT.customConfigs.clear(); AbstractIT.client = null; } diff --git a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts index 366f6a6b1de..d2aeea453cc 100644 --- a/catalogs/catalog-lakehouse-iceberg/build.gradle.kts +++ b/catalogs/catalog-lakehouse-iceberg/build.gradle.kts @@ -25,6 +25,8 @@ dependencies { implementation(libs.bundles.jetty) implementation(libs.bundles.jersey) implementation(libs.bundles.log4j) + implementation(libs.caffeine) + implementation(libs.cglib) implementation(libs.commons.collections4) implementation(libs.commons.io) implementation(libs.commons.lang3) @@ -163,6 +165,7 @@ tasks.test { doFirst { environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") + environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-kerberos-hive:0.1.2") } val init = project.extra.get("initIntegrationTest") as (Test) -> Unit diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 99003f5c892..258cb2d0bcd 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -89,6 +89,7 @@ public void initialize( Map resultConf = Maps.newHashMap(prefixMap); resultConf.putAll(gravitinoConfig); + resultConf.put("catalog_uuid", info.id().toString()); IcebergConfig icebergConfig = new IcebergConfig(resultConf); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java index eab79da21e1..6a38cba24c9 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java @@ -7,6 +7,8 @@ import static com.datastrato.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry; import static com.datastrato.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; import com.google.common.collect.ImmutableList; @@ -34,8 +36,7 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad // Map that maintains the mapping of keys in Gravitino to that in Iceberg, for example, users // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will - // change - // it to `catalogType` automatically and pass it to Iceberg. + // change it to `catalogType` automatically and pass it to Iceberg. public static final Map GRAVITINO_CONFIG_TO_ICEBERG = ImmutableMap.of( CATALOG_BACKEND_NAME, @@ -51,6 +52,21 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad WAREHOUSE, WAREHOUSE); + public static final Map KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND = + ImmutableMap.of( + KerberosConfig.PRINCIPAL_KEY, + KerberosConfig.PRINCIPAL_KEY, + KerberosConfig.KET_TAB_URI_KEY, + KerberosConfig.KET_TAB_URI_KEY, + KerberosConfig.CHECK_INTERVAL_SEC_KEY, + KerberosConfig.CHECK_INTERVAL_SEC_KEY, + KerberosConfig.FETCH_TIMEOUT_SEC_KEY, + KerberosConfig.FETCH_TIMEOUT_SEC_KEY, + AuthenticationConfig.IMPERSONATION_ENABLE_KEY, + AuthenticationConfig.IMPERSONATION_ENABLE_KEY, + AuthenticationConfig.AUTH_TYPE_KEY, + AuthenticationConfig.AUTH_TYPE_KEY); + static { List> propertyEntries = ImmutableList.of( @@ -67,6 +83,8 @@ public class IcebergCatalogPropertiesMetadata extends BaseCatalogPropertiesMetad WAREHOUSE, "Iceberg catalog warehouse config", false, false)); HashMap> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES); result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName)); + result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES); + result.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES); PROPERTIES_METADATA = ImmutableMap.copyOf(result); } @@ -82,6 +100,10 @@ public Map transformProperties(Map properties) { if (GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) { gravitinoConfig.put(GRAVITINO_CONFIG_TO_ICEBERG.get(key), value); } + + if (KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.containsKey(key)) { + gravitinoConfig.put(KERBEROS_CONFIGURATION_FOR_HIVE_BACKEND.get(key), value); + } }); return gravitinoConfig; } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergHiveCachedClientPool.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergHiveCachedClientPool.java new file mode 100644 index 00000000000..d779bf1b5fe --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergHiveCachedClientPool.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hive.HiveClientPool; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ThreadPools; +import org.apache.thrift.TException; + +/** + * Referred from Apache Iceberg's CachedClientPool implementation + * hive-metastore/src/main/java/org/apache/iceberg/hive/CachedClientPool.java + * + *

IcebergHiveCachedClientPool is used for every Iceberg catalog with Hive backend, I changed the + * method clientPool() from + * + *

{@code
+ * HiveClientPool clientPool() {
+ *    return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf));
+ *  }
+ * }
+ * + * to + * + *
{@code
+ * HiveClientPool clientPool() {
+ *   Key key = extractKey(properties.get(CatalogProperties.CLIENT_POOL_CACHE_KEYS), conf);
+ *   return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf));
+ * }
+ * }
+ * + * Why do we need to do this? Because the original client pool in iceberg uses a fixed username to + * create the client pool (please see the key in the method clientPool()). Assuming the original + * name is A and when a new user B tries to call the clientPool() method, it will use the connection + * that belongs to A. This will not work with kerberos authentication as it will change the user + * name. + */ +public class IcebergHiveCachedClientPool + implements ClientPool, Closeable { + private static final String CONF_ELEMENT_PREFIX = "conf:"; + + private static Cache clientPoolCache; + + private final Configuration conf; + private final Map properties; + private final int clientPoolSize; + private final long evictionInterval; + private ScheduledExecutorService scheduledExecutorService; + + public IcebergHiveCachedClientPool(Configuration conf, Map properties) { + this.conf = conf; + this.clientPoolSize = + PropertyUtil.propertyAsInt( + properties, + CatalogProperties.CLIENT_POOL_SIZE, + CatalogProperties.CLIENT_POOL_SIZE_DEFAULT); + this.evictionInterval = + PropertyUtil.propertyAsLong( + properties, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, + CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT); + this.properties = properties; + init(); + } + + @VisibleForTesting + HiveClientPool clientPool() { + Key key = extractKey(properties.get(CatalogProperties.CLIENT_POOL_CACHE_KEYS), conf); + return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf)); + } + + private synchronized void init() { + if (clientPoolCache == null) { + // Since Caffeine does not ensure that removalListener will be involved after expiration + // We use a scheduler with one thread to clean up expired clients. + scheduledExecutorService = ThreadPools.newScheduledPool("hive-metastore-cleaner", 1); + clientPoolCache = + Caffeine.newBuilder() + .expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS) + .removalListener((ignored, value, cause) -> ((HiveClientPool) value).close()) + .scheduler(Scheduler.forScheduledExecutorService(scheduledExecutorService)) + .build(); + } + } + + @VisibleForTesting + static Cache clientPoolCache() { + return clientPoolCache; + } + + @Override + public R run(Action action) + throws TException, InterruptedException { + return clientPool().run(action); + } + + @Override + public R run(Action action, boolean retry) + throws TException, InterruptedException { + return clientPool().run(action, retry); + } + + @VisibleForTesting + static Key extractKey(String cacheKeys, Configuration conf) { + // generate key elements in a certain order, so that the Key instances are comparable + List elements = Lists.newArrayList(); + elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "")); + elements.add(conf.get("HIVE_CONF_CATALOG", "hive")); + if (cacheKeys == null || cacheKeys.isEmpty()) { + return Key.of(elements); + } + + Set types = Sets.newTreeSet(Comparator.comparingInt(Enum::ordinal)); + Map confElements = Maps.newTreeMap(); + for (String element : cacheKeys.split(",", -1)) { + String trimmed = element.trim(); + if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) { + String key = trimmed.substring(CONF_ELEMENT_PREFIX.length()); + ValidationException.check( + !confElements.containsKey(key), "Conf key element %s already specified", key); + confElements.put(key, conf.get(key)); + } else { + KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase()); + switch (type) { + case UGI: + case USER_NAME: + ValidationException.check( + !types.contains(type), "%s key element already specified", type.name()); + types.add(type); + break; + default: + throw new ValidationException("Unknown key element %s", trimmed); + } + } + } + for (KeyElementType type : types) { + switch (type) { + case UGI: + try { + elements.add(UserGroupInformation.getCurrentUser()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + break; + case USER_NAME: + try { + elements.add(UserGroupInformation.getCurrentUser().getUserName()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + break; + default: + throw new RuntimeException("Unexpected key element " + type.name()); + } + } + return Key.of(elements); + } + + static class Key { + private final List elements; + + List elements() { + return elements; + } + + public Key(List elements) { + this.elements = elements; + } + + static Key of(List elements) { + return new Key(elements); + } + } + + private enum KeyElementType { + UGI, + USER_NAME, + CONF + } + + @Override + public void close() throws IOException { + clientPoolCache.asMap().forEach((key, value) -> value.close()); + clientPoolCache.invalidateAll(); + scheduledExecutorService.shutdownNow(); + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/AuthenticationConfig.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/AuthenticationConfig.java new file mode 100644 index 00000000000..ee769734140 --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/AuthenticationConfig.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication; + +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosConfig.DEFAULT_IMPERSONATION_ENABLE; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.config.ConfigEntry; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + +public class AuthenticationConfig extends Config { + + // The key for the authentication type, currently we support Kerberos and simple + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public static final String IMPERSONATION_ENABLE_KEY = "authentication.impersonation-enable"; + + public static final boolean DEFAULT_IMPERSONATION_ENABLE = false; + + enum AuthenticationType { + SIMPLE, + KERBEROS + } + + public AuthenticationConfig(Map properties) { + super(false); + loadFromMap(properties, k -> true); + } + + public static final ConfigEntry AUTH_TYPE_ENTRY = + new ConfigBuilder(AUTH_TYPE_KEY) + .doc( + "The type of authentication for Iceberg catalog, currently we support simple and Kerberos") + .version(ConfigConstants.VERSION_0_5_1) + .stringConf() + .createWithDefault("simple"); + + public static final ConfigEntry ENABLE_IMPERSONATION_ENTRY = + new ConfigBuilder(IMPERSONATION_ENABLE_KEY) + .doc("Whether to enable impersonation for Iceberg catalog") + .version(ConfigConstants.VERSION_0_5_1) + .booleanConf() + .createWithDefault(DEFAULT_IMPERSONATION_ENABLE); + + public String getAuthType() { + return get(AUTH_TYPE_ENTRY); + } + + public boolean isSimpleAuth() { + return AuthenticationType.SIMPLE.name().equalsIgnoreCase(getAuthType()); + } + + public boolean isKerberosAuth() { + return AuthenticationType.KERBEROS.name().equalsIgnoreCase(getAuthType()); + } + + public boolean isImpersonationEnabled() { + return get(ENABLE_IMPERSONATION_ENTRY); + } + + public static final Map> AUTHENTICATION_PROPERTY_ENTRIES = + new ImmutableMap.Builder>() + .put( + IMPERSONATION_ENABLE_KEY, + PropertyEntry.booleanPropertyEntry( + IMPERSONATION_ENABLE_KEY, + "Whether to enable impersonation for the Iceberg catalog", + false, + true, + DEFAULT_IMPERSONATION_ENABLE, + false, + false)) + .put( + AUTH_TYPE_KEY, + PropertyEntry.stringImmutablePropertyEntry( + AUTH_TYPE_KEY, + "The type of authentication for Hadoop catalog, currently we support simple Kerberos", + false, + "simple", + false, + false)) + .build(); +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/FetchFileUtils.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/FetchFileUtils.java new file mode 100644 index 00000000000..4c89cc8717a --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/FetchFileUtils.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.Optional; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws IOException { + try { + URI uri = new URI(fileUri); + String scheme = Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": + FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI())); + break; + + default: + throw new IllegalArgumentException( + String.format("Doesn't support the scheme %s", scheme)); + } + } catch (URISyntaxException ue) { + throw new IllegalArgumentException("The uri of file has the wrong format", ue); + } + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/HiveBackendProxy.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/HiveBackendProxy.java new file mode 100644 index 00000000000..bed202b56e1 --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/HiveBackendProxy.java @@ -0,0 +1,116 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos; + +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergHiveCachedClientPool; +import com.datastrato.gravitino.utils.PrincipalUtils; +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import net.sf.cglib.proxy.Enhancer; +import net.sf.cglib.proxy.MethodInterceptor; +import net.sf.cglib.proxy.MethodProxy; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.iceberg.ClientPool; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.thrift.TException; + +/** + * Proxy class for HiveCatalog to support kerberos authentication. We can also make HiveCatalog as a + * generic type and pass it as a parameter to the constructor. + */ +public class HiveBackendProxy implements MethodInterceptor { + + private final HiveCatalog target; + private final String kerberosRealm; + private final UserGroupInformation proxyUser; + private final Map properties; + private final ClientPool newClientPool; + + public HiveBackendProxy( + Map properties, HiveCatalog target, String kerberosRealm) { + this.target = target; + this.properties = properties; + this.kerberosRealm = kerberosRealm; + try { + proxyUser = UserGroupInformation.getCurrentUser(); + + // Replace the original client pool with IcebergHiveCachedClientPool. Why do we need to do + // this? Because the original client pool in iceberg uses a fixed username to create the + // client pool, and it will not work with kerberos authentication. We need to create a new + // client pool with the current user. For more, please see CachedClientPool#clientPool and + // notice the value of `key` + this.newClientPool = resetIcebergHiveClientPool(); + } catch (IOException e) { + throw new RuntimeException("Failed to get current user", e); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException("Failed to reset IcebergHiveClientPool", e); + } + } + + @Override + public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) + throws Throwable { + + String proxyKerberosPrincipalName = PrincipalUtils.getCurrentPrincipal().getName(); + if (!proxyKerberosPrincipalName.contains("@")) { + proxyKerberosPrincipalName = + String.format("%s@%s", proxyKerberosPrincipalName, kerberosRealm); + } + + UserGroupInformation realUser = + UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, proxyUser); + + String token = + newClientPool.run( + client -> + client.getDelegationToken( + PrincipalUtils.getCurrentPrincipal().getName(), proxyUser.getShortUserName())); + + Token delegationToken = new Token<>(); + delegationToken.decodeFromUrlString(token); + realUser.addToken(delegationToken); + + return realUser.doAs( + (PrivilegedExceptionAction) + () -> { + try { + return methodProxy.invoke(target, objects); + } catch (Throwable e) { + if (RuntimeException.class.isAssignableFrom(e.getClass())) { + throw (RuntimeException) e; + } + throw new RuntimeException("Failed to invoke method", e); + } + }); + } + + private ClientPool resetIcebergHiveClientPool() + throws IllegalAccessException, NoSuchFieldException { + final Field m = HiveCatalog.class.getDeclaredField("clients"); + m.setAccessible(true); + + // TODO: we need to close the original client pool and thread pool, or it will cause memory + // leak. + ClientPool newClientPool = + new IcebergHiveCachedClientPool(target.getConf(), properties); + m.set(target, newClientPool); + return newClientPool; + } + + public HiveCatalog getProxy() { + Enhancer e = new Enhancer(); + e.setClassLoader(target.getClass().getClassLoader()); + e.setSuperclass(target.getClass()); + e.setCallback(this); + return (HiveCatalog) e.create(); + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosClient.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosClient.java new file mode 100644 index 00000000000..326f47fc800 --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosClient.java @@ -0,0 +1,107 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map conf; + private final Configuration hadoopConf; + + public KerberosClient(Map conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + @SuppressWarnings("null") + List principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + KerberosName.resetDefaultRealm(); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation kerberosLoginUgi = UserGroupInformation.getCurrentUser(); + + // Refresh the cache if it's out of date. + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + kerberosLoginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + + return principalComponents.get(1); + } + + public File saveKeyTabFileFromUri(Long catalogId) throws IOException { + + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + // TODO: Support to download the file from Kerberos HDFS + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + // Ignore the return value, because there exists many Hive catalog operations making + // this directory. + keytabsDir.mkdir(); + } + + File keytabFile = new File(String.format(KerberosConfig.GRAVITINO_KEYTAB_FORMAT, catalogId)); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + + // TODO: Make the configuration + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosConfig.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosConfig.java new file mode 100644 index 00000000000..59ddbf5b6fa --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/authentication/kerberos/KerberosConfig.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos; + +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.AuthenticationConfig; +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.config.ConfigEntry; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +public class KerberosConfig extends AuthenticationConfig { + public static final String KET_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; + + public static final String PRINCIPAL_KEY = "authentication.kerberos.principal"; + + public static final String CHECK_INTERVAL_SEC_KEY = "authentication.kerberos.check-interval-sec"; + + public static final String FETCH_TIMEOUT_SEC_KEY = + "authentication.kerberos.keytab-fetch-timeout-sec"; + + public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s-keytab"; + + public static final ConfigEntry PRINCIPAL_ENTRY = + new ConfigBuilder(PRINCIPAL_KEY) + .doc("The principal of the Kerberos for Iceberg catalog with Kerberos authentication") + .version(ConfigConstants.VERSION_0_6_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry KEYTAB_ENTRY = + new ConfigBuilder(KET_TAB_URI_KEY) + .doc("The keytab of the Kerberos for Iceberg catalog with Kerberos authentication") + .version(ConfigConstants.VERSION_0_6_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry CHECK_INTERVAL_SEC_ENTRY = + new ConfigBuilder(CHECK_INTERVAL_SEC_KEY) + .doc( + "The check interval of the Kerberos credential for Iceberg catalog with Kerberos authentication") + .version(ConfigConstants.VERSION_0_6_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2); + + public static final ConfigEntry FETCH_TIMEOUT_SEC_ENTRY = + new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY) + .doc( + "The fetch timeout of the Kerberos key table of Iceberg catalog with Kerberos authentication") + .version(ConfigConstants.VERSION_0_6_0) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2); + + public KerberosConfig(Map properties) { + super(properties); + loadFromMap(properties, k -> true); + } + + public boolean isImpersonationEnabled() { + return get(ENABLE_IMPERSONATION_ENTRY); + } + + public String getPrincipalName() { + return get(PRINCIPAL_ENTRY); + } + + public String getKeytab() { + return get(KEYTAB_ENTRY); + } + + public int getCheckIntervalSec() { + return get(CHECK_INTERVAL_SEC_ENTRY); + } + + public int getFetchTimeoutSec() { + return get(FETCH_TIMEOUT_SEC_ENTRY); + } + + public static final Map> KERBEROS_PROPERTY_ENTRIES = + new ImmutableMap.Builder>() + .put( + KET_TAB_URI_KEY, + PropertyEntry.stringImmutablePropertyEntry( + KET_TAB_URI_KEY, + "The keytab of the Kerberos for Iceberg catalog with Kerberos authentication", + false, + null, + false, + false)) + .put( + PRINCIPAL_KEY, + PropertyEntry.stringImmutablePropertyEntry( + PRINCIPAL_KEY, + "The principal of the Kerberos for Iceberg catalog with Kerberos authentication", + false, + null, + false, + false)) + .put( + CHECK_INTERVAL_SEC_KEY, + PropertyEntry.integerOptionalPropertyEntry( + CHECK_INTERVAL_SEC_KEY, + "The check interval of the Kerberos credential for Iceberg catalog with Kerberos authentication", + true, + 60, + false)) + .put( + FETCH_TIMEOUT_SEC_KEY, + PropertyEntry.integerOptionalPropertyEntry( + FETCH_TIMEOUT_SEC_KEY, + "The fetch timeout of the Kerberos key table of Iceberg catalog with Kerberos authentication", + true, + 60, + false)) + .build(); +} diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java index f55c6be0ba7..7ef2bcd89a4 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/utils/IcebergCatalogUtil.java @@ -5,12 +5,20 @@ package com.datastrato.gravitino.catalog.lakehouse.iceberg.utils; import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_INITIALIZE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend; import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.AuthenticationConfig; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.HiveBackendProxy; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosClient; +import java.io.File; +import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Catalog; @@ -37,9 +45,44 @@ private static HiveCatalog loadHiveCatalog(Map properties) { HiveCatalog hiveCatalog = new HiveCatalog(); HdfsConfiguration hdfsConfiguration = new HdfsConfiguration(); properties.forEach(hdfsConfiguration::set); - hiveCatalog.setConf(hdfsConfiguration); - hiveCatalog.initialize("hive", properties); - return hiveCatalog; + + AuthenticationConfig authenticationConfig = new AuthenticationConfig(properties); + if (authenticationConfig.isSimpleAuth()) { + hiveCatalog.setConf(hdfsConfiguration); + hiveCatalog.initialize("hive", properties); + return hiveCatalog; + } else if (authenticationConfig.isKerberosAuth()) { + Map resultProperties = new HashMap<>(properties); + resultProperties.put(CatalogProperties.CLIENT_POOL_CACHE_KEYS, "USER_NAME"); + hdfsConfiguration.set(HADOOP_SECURITY_AUTHORIZATION, "true"); + hdfsConfiguration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + hiveCatalog.setConf(hdfsConfiguration); + hiveCatalog.initialize("hive", properties); + + String realm = initKerberosAndReturnRealm(properties, hdfsConfiguration); + if (authenticationConfig.isImpersonationEnabled()) { + HiveBackendProxy proxyHiveCatalog = + new HiveBackendProxy(resultProperties, hiveCatalog, realm); + return proxyHiveCatalog.getProxy(); + } + + return hiveCatalog; + } else { + throw new UnsupportedOperationException( + "Unsupported authentication method: " + authenticationConfig.getAuthType()); + } + } + + private static String initKerberosAndReturnRealm( + Map properties, Configuration conf) { + try { + KerberosClient kerberosClient = new KerberosClient(properties, conf); + File keytabFile = + kerberosClient.saveKeyTabFileFromUri(Long.valueOf(properties.get("catalog_uuid"))); + return kerberosClient.login(keytabFile.getAbsolutePath()); + } catch (IOException e) { + throw new RuntimeException("Failed to login with kerberos", e); + } } private static JdbcCatalog loadJdbcCatalog(Map properties) { diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index a2d2fd3ea60..50df91f96d2 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -120,11 +120,16 @@ public void startup() throws Exception { @AfterAll public void stop() throws Exception { - clearTableAndSchema(); - metalake.dropCatalog(catalogName); - client.dropMetalake(metalakeName); - spark.close(); - AbstractIT.stopIntegrationTest(); + try { + clearTableAndSchema(); + metalake.dropCatalog(catalogName); + client.dropMetalake(metalakeName); + } finally { + if (spark != null) { + spark.close(); + } + AbstractIT.stopIntegrationTest(); + } } @AfterEach diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java new file mode 100644 index 00000000000..6afd34e15ee --- /dev/null +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java @@ -0,0 +1,339 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.iceberg.integration.test; + +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.AuthenticationConfig.AUTH_TYPE_KEY; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosConfig.KET_TAB_URI_KEY; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.authentication.kerberos.KerberosConfig.PRINCIPAL_KEY; +import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergConfig; +import com.datastrato.gravitino.client.GravitinoAdminClient; +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.client.KerberosTokenProvider; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.rel.Column; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrders; +import com.datastrato.gravitino.rel.expressions.transforms.Transforms; +import com.datastrato.gravitino.rel.types.Types; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.apache.hadoop.security.authentication.util.KerberosUtil; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-it") +public class CatalogIcebergKerberosHiveIT extends AbstractIT { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogIcebergKerberosHiveIT.class); + + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + private static final String SDK_KERBEROS_PRINCIPAL_KEY = "client.kerberos.principal"; + private static final String SDK_KERBEROS_KEYTAB_KEY = "client.kerberos.keytab"; + + private static final String GRAVITINO_CLIENT_PRINCIPAL = "gravitino_client@HADOOPKRB"; + private static final String GRAVITINO_CLIENT_KEYTAB = "/gravitino_client.keytab"; + + private static final String GRAVITINO_SERVER_PRINCIPAL = "HTTP/localhost@HADOOPKRB"; + private static final String GRAVITINO_SERVER_KEYTAB = "/gravitino_server.keytab"; + + private static final String HIVE_METASTORE_CLIENT_PRINCIPAL = "cli@HADOOPKRB"; + private static final String HIVE_METASTORE_CLIENT_KEYTAB = "/client.keytab"; + + private static String TMP_DIR; + + private static HiveContainer kerberosHiveContainer; + + private static GravitinoAdminClient adminClient; + + private static final String METALAKE_NAME = GravitinoITUtils.genRandomName("test_metalake"); + private static final String CATALOG_NAME = GravitinoITUtils.genRandomName("test_catalog"); + private static final String SCHEMA_NAME = GravitinoITUtils.genRandomName("test_schema"); + private static final String TABLE_NAME = GravitinoITUtils.genRandomName("test_table"); + + private static String URIS; + private static String TYPE; + private static String WAREHOUSE; + + private static final String HIVE_COL_NAME1 = "col1"; + private static final String HIVE_COL_NAME2 = "col2"; + private static final String HIVE_COL_NAME3 = "col3"; + + @BeforeAll + public static void startIntegrationTest() { + containerSuite.startKerberosHiveContainer(); + kerberosHiveContainer = containerSuite.getKerberosHiveContainer(); + + URIS = + String.format( + "thrift://%s:%d", + kerberosHiveContainer.getContainerIpAddress(), HiveContainer.HIVE_METASTORE_PORT); + TYPE = "hive"; + WAREHOUSE = + String.format( + "hdfs://%s:%d/user/hive/warehouse-catalog-iceberg/", + kerberosHiveContainer.getContainerIpAddress(), HiveContainer.HDFS_DEFAULTFS_PORT); + + try { + File baseDir = new File(System.getProperty("java.io.tmpdir")); + File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile(); + file.deleteOnExit(); + TMP_DIR = file.getAbsolutePath(); + + // Prepare kerberos related-config; + prepareKerberosConfig(); + + // Config kerberos configuration for Gravitino server + addKerberosConfig(); + + ignoreIcebergRestService = false; + // Start Gravitino server + AbstractIT.startIntegrationTest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterAll + public static void stop() { + // Reset the UGI + UserGroupInformation.reset(); + + LOG.info("krb5 path: {}", System.getProperty("java.security.krb5.conf")); + // Clean up the kerberos configuration + System.clearProperty("java.security.krb5.conf"); + System.clearProperty("sun.security.krb5.debug"); + + AbstractIT.client = null; + } + + private static void prepareKerberosConfig() throws Exception { + // Keytab of the Gravitino SDK client + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/gravitino_client.keytab", TMP_DIR + GRAVITINO_CLIENT_KEYTAB); + + // Keytab of the Gravitino server + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/gravitino_server.keytab", TMP_DIR + GRAVITINO_SERVER_KEYTAB); + + // Keytab of Gravitino server to connector to Hive + kerberosHiveContainer + .getContainer() + .copyFileFromContainer("/etc/admin.keytab", TMP_DIR + HIVE_METASTORE_CLIENT_KEYTAB); + + String tmpKrb5Path = TMP_DIR + "/krb5.conf_tmp"; + String krb5Path = TMP_DIR + "/krb5.conf"; + kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", tmpKrb5Path); + + // Modify the krb5.conf and change the kdc and admin_server to the container IP + String ip = containerSuite.getKerberosHiveContainer().getContainerIpAddress(); + String content = FileUtils.readFileToString(new File(tmpKrb5Path), StandardCharsets.UTF_8); + content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88"); + content = content.replace("admin_server = localhost", "admin_server = " + ip + ":749"); + FileUtils.write(new File(krb5Path), content, StandardCharsets.UTF_8); + + LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path); + System.setProperty("java.security.krb5.conf", krb5Path); + System.setProperty("sun.security.krb5.debug", "true"); + + refreshKerberosConfig(); + KerberosName.resetDefaultRealm(); + + LOG.info("Kerberos default realm: {}", KerberosUtil.getDefaultRealm()); + } + + private static void refreshKerberosConfig() { + Class classRef; + try { + if (System.getProperty("java.vendor").contains("IBM")) { + classRef = Class.forName("com.ibm.security.krb5.internal.Config"); + } else { + classRef = Class.forName("sun.security.krb5.Config"); + } + + Method refershMethod = classRef.getMethod("refresh"); + refershMethod.invoke(null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void addKerberosConfig() { + AbstractIT.customConfigs.put("gravitino.authenticator", "kerberos"); + AbstractIT.customConfigs.put( + "gravitino.authenticator.kerberos.principal", GRAVITINO_SERVER_PRINCIPAL); + AbstractIT.customConfigs.put( + "gravitino.authenticator.kerberos.keytab", TMP_DIR + GRAVITINO_SERVER_KEYTAB); + AbstractIT.customConfigs.put(SDK_KERBEROS_KEYTAB_KEY, TMP_DIR + GRAVITINO_CLIENT_KEYTAB); + AbstractIT.customConfigs.put(SDK_KERBEROS_PRINCIPAL_KEY, GRAVITINO_CLIENT_PRINCIPAL); + } + + @Test + void testIcebergWithKerberosAndUserImpersonation() throws IOException { + KerberosTokenProvider provider = + KerberosTokenProvider.builder() + .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL) + .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB)) + .build(); + adminClient = GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build(); + + GravitinoMetalake gravitinoMetalake = + adminClient.createMetalake(METALAKE_NAME, null, ImmutableMap.of()); + + // Create a catalog + Map properties = Maps.newHashMap(); + properties.put(IMPERSONATION_ENABLE_KEY, "true"); + properties.put(AUTH_TYPE_KEY, "kerberos"); + + properties.put(KET_TAB_URI_KEY, TMP_DIR + HIVE_METASTORE_CLIENT_KEYTAB); + properties.put(PRINCIPAL_KEY, HIVE_METASTORE_CLIENT_PRINCIPAL); + properties.put( + CATALOG_BYPASS_PREFIX + "hive.metastore.kerberos.principal", + "hive/_HOST@HADOOPKRB" + .replace("_HOST", containerSuite.getKerberosHiveContainer().getHostName())); + properties.put(CATALOG_BYPASS_PREFIX + "hive.metastore.sasl.enabled", "true"); + + properties.put(IcebergConfig.CATALOG_BACKEND.getKey(), TYPE); + properties.put(IcebergConfig.CATALOG_URI.getKey(), URIS); + properties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(), WAREHOUSE); + properties.put("location", "hdfs://localhost:9000/user/hive/warehouse-catalog-iceberg"); + + Catalog catalog = + gravitinoMetalake.createCatalog( + CATALOG_NAME, Catalog.Type.RELATIONAL, "lakehouse-iceberg", "comment", properties); + + // Test create schema + Exception exception = + Assertions.assertThrows( + Exception.class, + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", ImmutableMap.of())); + String exceptionMessage = Throwables.getStackTraceAsString(exception); + + // Make sure the real user is 'gravitino_client' + Assertions.assertTrue( + exceptionMessage.contains("Permission denied: user=gravitino_client, access=WRITE")); + + // Now try to permit the user to create the schema again + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-mkdir", "/user/hive/warehouse-catalog-iceberg"); + kerberosHiveContainer.executeInContainer( + "hadoop", "fs", "-chmod", "-R", "777", "/user/hive/warehouse-catalog-iceberg"); + Assertions.assertDoesNotThrow( + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", ImmutableMap.of())); + + // Create table + NameIdentifier tableNameIdentifier = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, TABLE_NAME); + catalog + .asTableCatalog() + .createTable( + tableNameIdentifier, + createColumns(), + "", + ImmutableMap.of(), + Transforms.EMPTY_TRANSFORM, + Distributions.NONE, + SortOrders.NONE); + + // Now try to alter the table + catalog.asTableCatalog().alterTable(tableNameIdentifier, TableChange.rename("new_table")); + NameIdentifier newTableIdentifier = + NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, "new_table"); + + // Old table name should not exist + Assertions.assertFalse(catalog.asTableCatalog().tableExists(tableNameIdentifier)); + Assertions.assertTrue(catalog.asTableCatalog().tableExists(newTableIdentifier)); + + // Drop table + catalog.asTableCatalog().dropTable(newTableIdentifier); + Assertions.assertFalse(catalog.asTableCatalog().tableExists(newTableIdentifier)); + + // Drop schema + catalog.asSchemas().dropSchema(SCHEMA_NAME, false); + Assertions.assertFalse(catalog.asSchemas().schemaExists(SCHEMA_NAME)); + + // Drop catalog + Assertions.assertTrue(gravitinoMetalake.dropCatalog(CATALOG_NAME)); + } + + @Test + void testIcebergWithKerberos() { + KerberosTokenProvider provider = + KerberosTokenProvider.builder() + .withClientPrincipal(GRAVITINO_CLIENT_PRINCIPAL) + .withKeyTabFile(new File(TMP_DIR + GRAVITINO_CLIENT_KEYTAB)) + .build(); + adminClient = GravitinoAdminClient.builder(serverUri).withKerberosAuth(provider).build(); + + String metalakeName = GravitinoITUtils.genRandomName("test_metalake"); + GravitinoMetalake gravitinoMetalake = + adminClient.createMetalake(metalakeName, null, ImmutableMap.of()); + + // Create a catalog + Map properties = Maps.newHashMap(); + properties.put(AUTH_TYPE_KEY, "kerberos"); + // Not user impersonation here + + properties.put(KET_TAB_URI_KEY, TMP_DIR + HIVE_METASTORE_CLIENT_KEYTAB); + properties.put(PRINCIPAL_KEY, HIVE_METASTORE_CLIENT_PRINCIPAL); + properties.put( + CATALOG_BYPASS_PREFIX + "hive.metastore.kerberos.principal", + "hive/_HOST@HADOOPKRB" + .replace("_HOST", containerSuite.getKerberosHiveContainer().getHostName())); + properties.put(CATALOG_BYPASS_PREFIX + "hive.metastore.sasl.enabled", "true"); + + properties.put(IcebergConfig.CATALOG_BACKEND.getKey(), TYPE); + properties.put(IcebergConfig.CATALOG_URI.getKey(), URIS); + properties.put(IcebergConfig.CATALOG_WAREHOUSE.getKey(), WAREHOUSE); + properties.put("location", "hdfs://localhost:9000/user/hive/warehouse-catalog-iceberg"); + + Catalog catalog = + gravitinoMetalake.createCatalog( + CATALOG_NAME, Catalog.Type.RELATIONAL, "lakehouse-iceberg", "comment", properties); + + // Test create schema + Exception exception = + Assertions.assertThrows( + Exception.class, + () -> catalog.asSchemas().createSchema(SCHEMA_NAME, "comment", ImmutableMap.of())); + String exceptionMessage = Throwables.getStackTraceAsString(exception); + + // Make sure the real user is 'cli' because no impersonation here. + Assertions.assertTrue(exceptionMessage.contains("Permission denied: user=cli, access=WRITE")); + } + + private static Column[] createColumns() { + Column col1 = Column.of(HIVE_COL_NAME1, Types.IntegerType.get(), "col_1_comment"); + Column col2 = Column.of(HIVE_COL_NAME2, Types.DateType.get(), "col_2_comment"); + Column col3 = Column.of(HIVE_COL_NAME3, Types.StringType.get(), "col_3_comment"); + return new Column[] {col1, col2, col3}; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/config/ConfigConstants.java b/core/src/main/java/com/datastrato/gravitino/config/ConfigConstants.java index d88e9682ece..b30869638e4 100644 --- a/core/src/main/java/com/datastrato/gravitino/config/ConfigConstants.java +++ b/core/src/main/java/com/datastrato/gravitino/config/ConfigConstants.java @@ -34,4 +34,6 @@ private ConfigConstants() {} public static final String VERSION_0_5_0 = "0.5.0"; /** The version number for the 0.5.1 release. */ public static final String VERSION_0_5_1 = "0.5.1"; + /** The version number for the 0.6.0 release. */ + public static final String VERSION_0_6_0 = "0.6.0"; } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 04b733be3ae..18844962274 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -60,6 +60,7 @@ awaitility = "4.2.1" servlet = "3.1.0" jodd = "3.5.2" flink = "1.18.0" +cglib = "2.2" ranger = "2.4.0" protobuf-plugin = "0.9.2" @@ -168,6 +169,8 @@ sun-activation = { group = "com.sun.activation", name = "javax.activation", vers kafka-clients = { group = "org.apache.kafka", name = "kafka-clients", version.ref = "kafka" } kafka = { group = "org.apache.kafka", name = "kafka_2.12", version.ref = "kafka" } curator-test = { group = "org.apache.curator", name = "curator-test", version.ref = "curator"} +cglib = { group = "cglib", name = "cglib", version.ref = "cglib"} + ranger-intg = { group = "org.apache.ranger", name = "ranger-intg", version.ref = "ranger" } selenium = { group = "org.seleniumhq.selenium", name = "selenium-java", version.ref = "selenium" } rauschig = { group = "org.rauschig", name = "jarchivelib", version.ref = "rauschig" } diff --git a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java index c55132ee7ac..051e99f3abf 100644 --- a/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java +++ b/integration-test-common/src/test/java/com/datastrato/gravitino/integration/test/util/AbstractIT.java @@ -21,6 +21,7 @@ import com.datastrato.gravitino.server.web.JettyServerConfig; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -72,6 +73,8 @@ public class AbstractIT { protected static String serverUri; + protected static String originConfig; + public static int getGravitinoServerPort() { JettyServerConfig jettyServerConfig = JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX); @@ -83,29 +86,30 @@ public static void registerCustomConfigs(Map configs) { } private static void rewriteGravitinoServerConfig() throws IOException { - if (customConfigs.isEmpty()) return; - String gravitinoHome = System.getenv("GRAVITINO_HOME"); + Path configPath = Paths.get(gravitinoHome, "conf", GravitinoServer.CONF_FILE); + if (originConfig == null) { + originConfig = FileUtils.readFileToString(configPath.toFile(), StandardCharsets.UTF_8); + } + + if (customConfigs.isEmpty()) return; String tmpFileName = GravitinoServer.CONF_FILE + ".tmp"; Path tmpPath = Paths.get(gravitinoHome, "conf", tmpFileName); Files.deleteIfExists(tmpPath); - Path configPath = Paths.get(gravitinoHome, "conf", GravitinoServer.CONF_FILE); Files.move(configPath, tmpPath); - ITUtils.rewriteConfigFile(tmpPath.toString(), configPath.toString(), customConfigs); } private static void recoverGravitinoServerConfig() throws IOException { - if (customConfigs.isEmpty()) return; - String gravitinoHome = System.getenv("GRAVITINO_HOME"); - String tmpFileName = GravitinoServer.CONF_FILE + ".tmp"; - Path tmpPath = Paths.get(gravitinoHome, "conf", tmpFileName); Path configPath = Paths.get(gravitinoHome, "conf", GravitinoServer.CONF_FILE); - Files.deleteIfExists(configPath); - Files.move(tmpPath, configPath); + + if (originConfig != null) { + Files.deleteIfExists(configPath); + FileUtils.write(configPath.toFile(), originConfig, StandardCharsets.UTF_8); + } } protected static void downLoadJDBCDriver() throws IOException {