diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java index 94622e019b6..596268395e3 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/credential/CredentialConstants.java @@ -20,7 +20,7 @@ package org.apache.gravitino.credential; public class CredentialConstants { - public static final String CREDENTIAL_TYPE = "credential-type"; + public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type"; private CredentialConstants() {} } diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 28fce1c2b90..d5884b8601a 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -112,8 +112,7 @@ public void initialize( this.icebergCatalogWrapper = new IcebergCatalogWrapper( - icebergConfig, - false /*Gravitino server will support credential vending in separate interface*/); + icebergConfig); this.icebergCatalogWrapperHelper = new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog()); } diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java index e54144d9c76..54852a166b3 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/ops/TestIcebergTableUpdate.java @@ -82,7 +82,7 @@ public class TestIcebergTableUpdate { @BeforeEach public void init() { icebergCatalogWrapper = - new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap()), false); + new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap())); icebergCatalogWrapperHelper = new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog()); createNamespace(TEST_NAMESPACE_NAME); diff --git a/common/src/main/java/org/apache/gravitino/credential/CredentialUtils.java b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java similarity index 97% rename from common/src/main/java/org/apache/gravitino/credential/CredentialUtils.java rename to common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java index 91f222d8bb3..255e54fbf3d 100644 --- a/common/src/main/java/org/apache/gravitino/credential/CredentialUtils.java +++ b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java @@ -24,7 +24,7 @@ /** * Helper class to generate specific credential properties for different table format and engine. */ -public class CredentialUtils { +public class CredentialPropertyUtils { /** * Transforms a specific credential into a map of Iceberg properties. * diff --git a/core/src/main/java/org/apache/gravitino/credential/CredentialProviderManager.java b/core/src/main/java/org/apache/gravitino/credential/CredentialProviderManager.java index 3a615e95e79..98ccfe89e3b 100644 --- a/core/src/main/java/org/apache/gravitino/credential/CredentialProviderManager.java +++ b/core/src/main/java/org/apache/gravitino/credential/CredentialProviderManager.java @@ -19,19 +19,22 @@ package org.apache.gravitino.credential; -import org.apache.gravitino.NameIdentifier; +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import org.apache.gravitino.utils.PrincipalUtils; public class CredentialProviderManager { - void registerCredentialProvider(NameIdentifier nameIdentifier, CredentialProvider credentialProvider) { + public void registerCredentialProvider(String catalogName, CredentialProvider credentialProvider) { } - void unregisterCredentialProvider(NameIdentifier nameIdentifier) { + public void unregisterCredentialProvider(String catalogName) { } - CredentialProvider getCredentialProvider(NameIdentifier nameIdentifier, String credentialType) { + public CredentialProvider getCredentialProvider(String catalogName) { return null; } } diff --git a/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java b/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java new file mode 100644 index 00000000000..07d1cccabd4 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/credential/CredentialUtils.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.credential; + +import com.google.common.collect.ImmutableSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.utils.PrincipalUtils; + +public class CredentialUtils { + public static Credential vendCredential(CredentialProvider credentialProvider, String path) { + PathBasedCredentialContext pathBasedCredentialContext = + new PathBasedCredentialContext( + PrincipalUtils.getCurrentUserName(), ImmutableSet.of(path), ImmutableSet.of()); + return credentialProvider.getCredential(pathBasedCredentialContext); + } +} diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 999eb859aec..c8b3f405b76 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -234,9 +234,9 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { .toSequence() .createWithDefault(Collections.emptyList()); - public static final ConfigEntry CREDENTIAL_TYPE = - new ConfigBuilder(CredentialConstants.CREDENTIAL_TYPE) - .doc("The credential type for Iceberg") + public static final ConfigEntry CREDENTIAL_PROVIDER_TYPE = + new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDER_TYPE) + .doc("The credential provider type for Iceberg") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java similarity index 71% rename from iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java rename to iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java index 758aa46aa08..4548770b962 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java @@ -19,12 +19,14 @@ package org.apache.gravitino.iceberg.common.ops; import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.iceberg.common.IcebergConfig; /** - * IcebergCatalogWrapperProvider is an interface defining how Iceberg REST catalog server gets - * Iceberg catalogs. + * {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server gets + * Iceberg catalog configurations. */ -public interface IcebergCatalogWrapperProvider { +public interface IcebergCatalogConfigProvider { /** * @param properties The parameters for creating Provider which from configurations whose prefix @@ -33,8 +35,8 @@ public interface IcebergCatalogWrapperProvider { void initialize(Map properties); /** - * @param catalogName a param send by clients. - * @return the instance of IcebergCatalogWrapper. + * @param catalogName Iceberg catalog name. + * @return the configuration of Iceberg catalog. */ - IcebergCatalogWrapper getIcebergTableOps(String catalogName); + Optional getIcebergCatalogConfig(String catalogName); } diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java index de2d7686744..2bf10ac846c 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet; import java.sql.Driver; import java.sql.DriverManager; -import java.util.HashMap; import java.util.Locale; import java.util.Map; import java.util.Optional; @@ -31,20 +30,13 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.Setter; -import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; -import org.apache.gravitino.credential.Credential; -import org.apache.gravitino.credential.CredentialConstants; import org.apache.gravitino.credential.CredentialProvider; -import org.apache.gravitino.credential.CredentialProviderFactory; -import org.apache.gravitino.credential.CredentialUtils; -import org.apache.gravitino.credential.PathBasedCredentialContext; import org.apache.gravitino.iceberg.common.IcebergCatalogBackend; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil; import org.apache.gravitino.utils.IsolatedClassLoader; import org.apache.gravitino.utils.MapUtils; -import org.apache.gravitino.utils.PrincipalUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.iceberg.Transaction; @@ -78,7 +70,6 @@ public class IcebergCatalogWrapper implements AutoCloseable { @Getter protected Catalog catalog; private SupportsNamespaces asNamespaceCatalog; private final IcebergCatalogBackend catalogBackend; - private Optional credentialProvider; private String catalogUri = null; private Map catalogConfigToClients; private Map catalogPropertiesMap; @@ -93,7 +84,7 @@ public class IcebergCatalogWrapper implements AutoCloseable { IcebergConstants.ICEBERG_OSS_ACCESS_KEY_ID, IcebergConstants.ICEBERG_OSS_ACCESS_KEY_SECRET); - public IcebergCatalogWrapper(IcebergConfig icebergConfig, boolean supportsCredentialVending) { + public IcebergCatalogWrapper(IcebergConfig icebergConfig) { this.catalogBackend = IcebergCatalogBackend.valueOf( icebergConfig.get(IcebergConfig.CATALOG_BACKEND).toUpperCase(Locale.ROOT)); @@ -113,14 +104,7 @@ public IcebergCatalogWrapper(IcebergConfig icebergConfig, boolean supportsCreden this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties(); - String credentialType = icebergConfig.get(IcebergConfig.CREDENTIAL_TYPE); - if (StringUtils.isBlank(credentialType) || !supportsCredentialVending) { - this.credentialProvider = Optional.empty(); - } else { - this.credentialProvider = - Optional.of( - CredentialProviderFactory.create(credentialType, icebergConfig.getAllConfig())); - } + } private void validateNamespace(Optional namespace) { @@ -175,7 +159,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest /** * Reload hadoop configuration, this is useful when the hadoop configuration UserGroupInformation * is shared by multiple threads. UserGroupInformation#authenticationMethod was first initialized - * in KerberosClient, however, when switching to iceberg-rest thead, + * in KerberosClient, however, when switching to iceberg-rest thread, * UserGroupInformation#authenticationMethod will be reset to the default value; we need to * reinitialize it again. */ @@ -197,18 +181,6 @@ public LoadTableResponse createTable(Namespace namespace, CreateTableRequest req this::getCatalogConfigToClient); } - public LoadTableResponse createTableWithCredentialVending( - Namespace namespace, CreateTableRequest request) { - request.validate(); - if (request.stageCreate()) { - return injectTableConfig( - () -> CatalogHandlers.stageTableCreate(catalog, namespace, request), - this::vendCredentials); - } - return injectTableConfig( - () -> CatalogHandlers.createTable(catalog, namespace, request), this::vendCredentials); - } - public void dropTable(TableIdentifier tableIdentifier) { CatalogHandlers.dropTable(catalog, tableIdentifier); } @@ -222,11 +194,6 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier) { () -> CatalogHandlers.loadTable(catalog, tableIdentifier), this::getCatalogConfigToClient); } - public LoadTableResponse loadTableWithCredentialVending(TableIdentifier tableIdentifier) { - return injectTableConfig( - () -> CatalogHandlers.loadTable(catalog, tableIdentifier), this::vendCredentials); - } - public boolean tableExists(TableIdentifier tableIdentifier) { return catalog.tableExists(tableIdentifier); } @@ -287,14 +254,6 @@ public void close() throws Exception { // JdbcCatalog and WrappedHiveCatalog need close. ((AutoCloseable) catalog).close(); } - credentialProvider.ifPresent( - provider -> { - try { - provider.close(); - } catch (Exception e) { - LOG.warn("Close credential provider failed,", e); - } - }); // Because each catalog in Gravitino has its own classloader, after a catalog is no longer used // for a long time or dropped, the instance of classloader needs to be released. In order to @@ -316,7 +275,7 @@ public void close() throws Exception { private void closeMySQLCatalogResource() { try { // Close thread AbandonedConnectionCleanupThread if we are using `com.mysql.cj.jdbc.Driver`, - // for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thead maybe not this one. + // for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thread maybe not this one. Class.forName("com.mysql.cj.jdbc.AbandonedConnectionCleanupThread") .getMethod("uncheckedShutdown") .invoke(null); @@ -361,25 +320,6 @@ private Map getCatalogConfigToClient(String location) { return catalogConfigToClients; } - private Map vendCredentials(String location) { - // ifPresentOrElse is not supported in Java8 - if (!credentialProvider.isPresent()) { - throw new IllegalArgumentException( - "Credential vending is not enabled, please set " - + CredentialConstants.CREDENTIAL_TYPE - + " to proper values"); - } - - Map configs = new HashMap<>(catalogConfigToClients); - // todo(fanng): check user privilege. - PathBasedCredentialContext pathBasedCredentialContext = - new PathBasedCredentialContext( - PrincipalUtils.getCurrentUserName(), ImmutableSet.of(location), ImmutableSet.of()); - Credential credential = credentialProvider.get().getCredential(pathBasedCredentialContext); - configs.putAll(CredentialUtils.toIcebergProperties(credential)); - return configs; - } - @Getter @Setter public static final class IcebergTableChange { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogConfigProvider.java similarity index 78% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogConfigProvider.java index a62be38a065..250d5ff69f9 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogConfigProvider.java @@ -24,8 +24,7 @@ import java.util.stream.Collectors; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider; +import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.apache.gravitino.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,11 +39,11 @@ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ... */ -public class ConfigBasedIcebergCatalogWrapperProvider implements IcebergCatalogWrapperProvider { +public class ConfigBasedIcebergCatalogConfigProvider implements IcebergCatalogConfigProvider { public static final Logger LOG = - LoggerFactory.getLogger(ConfigBasedIcebergCatalogWrapperProvider.class); + LoggerFactory.getLogger(ConfigBasedIcebergCatalogConfigProvider.class); - public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = "config-based-provider"; + public static final String CONFIG_BASE_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME = "config-based-provider"; @VisibleForTesting Map catalogConfigs; @@ -68,14 +67,8 @@ public void initialize(Map properties) { } @Override - public IcebergCatalogWrapper getIcebergTableOps(String catalogName) { - IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName); - if (icebergConfig == null) { - String errorMsg = String.format("%s can not match any catalog", catalogName); - LOG.warn(errorMsg); - throw new RuntimeException(errorMsg); - } - return new IcebergCatalogWrapper(icebergConfig, true); + public IcebergConfig getIcebergCatalogConfig(String catalogName) { + return this.catalogConfigs.get(catalogName); } private Optional getCatalogName(String catalogConfigKey) { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogConfigProvider.java similarity index 80% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogConfigProvider.java index 828d523071f..3100e4bd466 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogConfigProvider.java @@ -21,14 +21,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.Map; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider; +import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +41,12 @@ * *

The catalogName is iceberg_catalog */ -public class GravitinoBasedIcebergCatalogWrapperProvider - implements IcebergCatalogWrapperProvider, AutoCloseable { +public class GravitinoBasedIcebergCatalogConfigProvider + implements IcebergCatalogConfigProvider, AutoCloseable { public static final Logger LOG = - LoggerFactory.getLogger(GravitinoBasedIcebergCatalogWrapperProvider.class); + LoggerFactory.getLogger(GravitinoBasedIcebergCatalogConfigProvider.class); - public static final String GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = + public static final String GRAVITINO_BASE_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME = "gravitino-based-provider"; private String gravitinoMetalake; @@ -66,14 +68,19 @@ public void initialize(Map properties) { } @Override - public IcebergCatalogWrapper getIcebergTableOps(String catalogName) { + public Optional getIcebergCatalogConfig(String catalogName) { Preconditions.checkArgument( StringUtils.isNotBlank(catalogName), "blank catalogName is illegal"); Preconditions.checkArgument( !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName), IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider"); - Catalog catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName); + Catalog catalog; + try { + catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName); + } catch (NoSuchCatalogException e) { + return Optional.empty(); + } Preconditions.checkArgument( "lakehouse-iceberg".equals(catalog.provider()), @@ -81,7 +88,7 @@ public IcebergCatalogWrapper getIcebergTableOps(String catalogName) { Map properties = IcebergPropertiesUtils.toIcebergCatalogProperties(catalog.properties()); - return new IcebergCatalogWrapper(new IcebergConfig(properties), true); + return Optional.of(new IcebergConfig(properties)); } @VisibleForTesting diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index 17342acf71f..4bed0840e2e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -25,37 +25,44 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.credential.CredentialProvider; +import org.apache.gravitino.credential.CredentialProviderFactory; +import org.apache.gravitino.credential.CredentialProviderManager; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider; -import org.apache.gravitino.iceberg.provider.ConfigBasedIcebergCatalogWrapperProvider; -import org.apache.gravitino.iceberg.provider.GravitinoBasedIcebergCatalogWrapperProvider; +import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; +import org.apache.gravitino.iceberg.provider.ConfigBasedIcebergCatalogConfigProvider; +import org.apache.gravitino.iceberg.provider.GravitinoBasedIcebergCatalogConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class IcebergCatalogWrapperManager implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class); - private static final ImmutableMap ICEBERG_TABLE_OPS_PROVIDER_NAMES = + private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = ImmutableMap.of( - ConfigBasedIcebergCatalogWrapperProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME, - ConfigBasedIcebergCatalogWrapperProvider.class.getCanonicalName(), - GravitinoBasedIcebergCatalogWrapperProvider - .GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME, - GravitinoBasedIcebergCatalogWrapperProvider.class.getCanonicalName()); + ConfigBasedIcebergCatalogConfigProvider.CONFIG_BASE_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + ConfigBasedIcebergCatalogConfigProvider.class.getCanonicalName(), + GravitinoBasedIcebergCatalogConfigProvider + .GRAVITINO_BASE_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + GravitinoBasedIcebergCatalogConfigProvider.class.getCanonicalName()); - private final Cache icebergTableOpsCache; + private final Cache icebergCatalogWrapperCache; - private final IcebergCatalogWrapperProvider provider; + private final IcebergCatalogConfigProvider provider; + + private CredentialProviderManager credentialProviderManager; public IcebergCatalogWrapperManager(Map properties) { + this.credentialProviderManager = new CredentialProviderManager(); this.provider = createProvider(properties); this.provider.initialize(properties); - this.icebergTableOpsCache = + this.icebergCatalogWrapperCache = Caffeine.newBuilder() .expireAfterWrite( (new IcebergConfig(properties)) @@ -63,8 +70,10 @@ public IcebergCatalogWrapperManager(Map properties) { TimeUnit.MILLISECONDS) .removalListener( (k, v, c) -> { - LOG.info("Remove IcebergCatalogWrapper cache {}.", k); - closeIcebergTableOps((IcebergCatalogWrapper) v); + String catalogName = (String)k; + LOG.info("Remove IcebergCatalogWrapper cache {}.", catalogName); + closeIcebergCatalogWrapper((IcebergCatalogWrapper) v); + credentialProviderManager.unregisterCredentialProvider(catalogName); }) .scheduler( Scheduler.forScheduledExecutorService( @@ -72,7 +81,7 @@ public IcebergCatalogWrapperManager(Map properties) { 1, new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("table-ops-cleaner-%d") + .setNameFormat("iceberg-catalog-wrapper-cleaner-%d") .build()))) .build(); } @@ -85,13 +94,35 @@ public IcebergCatalogWrapperManager(Map properties) { public IcebergCatalogWrapper getOps(String rawPrefix) { String catalogName = getCatalogName(rawPrefix); IcebergCatalogWrapper tableOps = - icebergTableOpsCache.get(catalogName, k -> provider.getIcebergTableOps(catalogName)); + icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName)); // Reload conf to reset UserGroupInformation or icebergTableOps will always use // Simple auth. tableOps.reloadHadoopConf(); return tableOps; } + public CredentialProvider getCredentialProvider(String prefix) { + String catalogName = getCatalogName(prefix); + return credentialProviderManager.getCredentialProvider(catalogName); + } + + private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { + Optional icebergConfig = provider.getIcebergCatalogConfig(catalogName); + if (!icebergConfig.isPresent()) { + throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName); + } + + IcebergConfig config = icebergConfig.get(); + String credentialProviderType = config.get(IcebergConfig.CREDENTIAL_PROVIDER_TYPE); + if (StringUtils.isNotBlank(credentialProviderType)) { + CredentialProvider credentialProvider = CredentialProviderFactory.create(credentialProviderType, config.getAllConfig()); + credentialProviderManager.registerCredentialProvider(catalogName, credentialProvider); + } + + IcebergCatalogWrapper catalogWrapper = new IcebergCatalogWrapper(icebergConfig.get()); + return catalogWrapper; + } + private String getCatalogName(String rawPrefix) { String prefix = shelling(rawPrefix); Preconditions.checkArgument( @@ -103,14 +134,14 @@ private String getCatalogName(String rawPrefix) { return prefix; } - private IcebergCatalogWrapperProvider createProvider(Map properties) { + private IcebergCatalogConfigProvider createProvider(Map properties) { String providerName = (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER); - String className = ICEBERG_TABLE_OPS_PROVIDER_NAMES.getOrDefault(providerName, providerName); + String className = ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName); LOG.info("Load Iceberg catalog provider: {}.", className); try { Class providerClz = Class.forName(className); - return (IcebergCatalogWrapperProvider) providerClz.getDeclaredConstructor().newInstance(); + return (IcebergCatalogConfigProvider) providerClz.getDeclaredConstructor().newInstance(); } catch (Exception e) { throw new RuntimeException(e); } @@ -127,17 +158,17 @@ private String shelling(String rawPrefix) { } } - private void closeIcebergTableOps(IcebergCatalogWrapper ops) { + private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { try { - ops.close(); + catalogWrapper.close(); } catch (Exception ex) { - LOG.warn("Close Iceberg table ops fail: {}, {}", ops, ex); + LOG.warn("Close Iceberg table catalog wrapper fail: {}, {}", catalogWrapper, ex); } } @Override public void close() throws Exception { - icebergTableOpsCache.invalidateAll(); + icebergCatalogWrapperCache.invalidateAll(); if (provider instanceof AutoCloseable) { ((AutoCloseable) provider).close(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java index 47cc6cf1fb7..042f251859c 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java @@ -22,6 +22,7 @@ import com.codahale.metrics.annotation.Timed; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Map; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; @@ -39,6 +40,11 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.credential.Credential; +import org.apache.gravitino.credential.CredentialProvider; +import org.apache.gravitino.credential.CredentialProviderManager; +import org.apache.gravitino.credential.CredentialPropertyUtils; +import org.apache.gravitino.credential.CredentialUtils; import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergObjectMapper; import org.apache.gravitino.iceberg.service.IcebergRestUtils; @@ -49,6 +55,7 @@ import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -105,11 +112,8 @@ public Response createTable( accessDelegation, isCredentialVending); if (isCredentialVending) { - return IcebergRestUtils.ok( - icebergCatalogWrapperManager - .getOps(prefix) - .createTableWithCredentialVending( - RESTUtil.decodeNamespace(namespace), createTableRequest)); + LoadTableResponse loadTableResponse = icebergCatalogWrapperManager.getOps(prefix).createTable(RESTUtil.decodeNamespace(namespace), createTableRequest); + return IcebergRestUtils.ok(injectCredentialConfig(prefix, loadTableResponse)); } else { return IcebergRestUtils.ok( icebergCatalogWrapperManager @@ -241,6 +245,22 @@ private String SerializeUpdateTableRequest(UpdateTableRequest updateTableRequest } } + private LoadTableResponse injectCredentialConfig(String prefix, + LoadTableResponse loadTableResponse) { + CredentialProvider credentialProvider = icebergCatalogWrapperManager.getCredentialProvider(prefix); + if (credentialProvider == null) { + throw new RuntimeException("Doesn't support credential vending"); + } + Credential credential = CredentialUtils.vendCredential(credentialProvider, + loadTableResponse.tableMetadata().location()); + if (credential == null) { + throw new RuntimeException("Couldn't generate credential for " + credentialProvider.credentialType()); + } + Map credentialConfig = CredentialPropertyUtils.toIcebergProperties(credential); + return LoadTableResponse.builder().withTableMetadata(loadTableResponse.tableMetadata()) + .addAllConfig(loadTableResponse.config()).addAllConfig(credentialConfig).build(); + } + private boolean isCredentialVending(String accessDelegation) { if (StringUtils.isBlank(accessDelegation)) { return false; diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java index 99e83f2e41d..6630b0b9c01 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java @@ -58,16 +58,16 @@ public void testValidIcebergTableOps() { config.put("catalog-backend", "memory"); config.put("warehouse", "/tmp/"); - ConfigBasedIcebergCatalogWrapperProvider provider = - new ConfigBasedIcebergCatalogWrapperProvider(); + ConfigBasedIcebergCatalogConfigProvider provider = + new ConfigBasedIcebergCatalogConfigProvider(); provider.initialize(config); IcebergConfig hiveIcebergConfig = provider.catalogConfigs.get(hiveCatalogName); IcebergConfig jdbcIcebergConfig = provider.catalogConfigs.get(jdbcCatalogName); IcebergConfig defaultIcebergConfig = provider.catalogConfigs.get(defaultCatalogName); - IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName); - IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName); - IcebergCatalogWrapper defaultOps = provider.getIcebergTableOps(defaultCatalogName); + IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogConfig(hiveCatalogName); + IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogConfig(jdbcCatalogName); + IcebergCatalogWrapper defaultOps = provider.getIcebergCatalogConfig(defaultCatalogName); Assertions.assertEquals( hiveCatalogName, hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME)); @@ -102,11 +102,11 @@ public void testValidIcebergTableOps() { @ParameterizedTest @ValueSource(strings = {"", "not_match"}) public void testInvalidIcebergTableOps(String catalogName) { - ConfigBasedIcebergCatalogWrapperProvider provider = - new ConfigBasedIcebergCatalogWrapperProvider(); + ConfigBasedIcebergCatalogConfigProvider provider = + new ConfigBasedIcebergCatalogConfigProvider(); provider.initialize(Maps.newHashMap()); Assertions.assertThrowsExactly( - RuntimeException.class, () -> provider.getIcebergTableOps(catalogName)); + RuntimeException.class, () -> provider.getIcebergCatalogConfig(catalogName)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java index 8acac4ffd6b..a79fee8ba57 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java @@ -71,14 +71,14 @@ public void testValidIcebergTableOps() { } }); - GravitinoBasedIcebergCatalogWrapperProvider provider = - new GravitinoBasedIcebergCatalogWrapperProvider(); + GravitinoBasedIcebergCatalogConfigProvider provider = + new GravitinoBasedIcebergCatalogConfigProvider(); GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); provider.setClient(client); - IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName); - IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName); + IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogConfig(hiveCatalogName); + IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogConfig(jdbcCatalogName); Assertions.assertEquals(hiveCatalogName, hiveOps.getCatalog().name()); Assertions.assertEquals(jdbcCatalogName, jdbcOps.getCatalog().name()); @@ -101,16 +101,16 @@ public void testInvalidIcebergTableOps() { GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); - GravitinoBasedIcebergCatalogWrapperProvider provider = - new GravitinoBasedIcebergCatalogWrapperProvider(); + GravitinoBasedIcebergCatalogConfigProvider provider = + new GravitinoBasedIcebergCatalogConfigProvider(); provider.setClient(client); Assertions.assertThrowsExactly( - IllegalArgumentException.class, () -> provider.getIcebergTableOps(invalidCatalogName)); + IllegalArgumentException.class, () -> provider.getIcebergCatalogConfig(invalidCatalogName)); Assertions.assertThrowsExactly( - IllegalArgumentException.class, () -> provider.getIcebergTableOps("")); + IllegalArgumentException.class, () -> provider.getIcebergCatalogConfig("")); Assertions.assertThrowsExactly( IllegalArgumentException.class, - () -> provider.getIcebergTableOps(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); + () -> provider.getIcebergCatalogConfig(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java index 222391bcc04..af0486ca53f 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java @@ -18,13 +18,14 @@ */ package org.apache.gravitino.iceberg.service.rest; +import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.provider.ConfigBasedIcebergCatalogWrapperProvider; +import org.apache.gravitino.iceberg.provider.ConfigBasedIcebergCatalogConfigProvider; public class ConfigBasedIcebergCatalogWrapperProviderForTest - extends ConfigBasedIcebergCatalogWrapperProvider { + extends ConfigBasedIcebergCatalogConfigProvider { @Override - public IcebergCatalogWrapper getIcebergTableOps(String prefix) { - return new IcebergCatalogWrapperForTest(); + public IcebergConfig getIcebergCatalogConfig(String prefix) { + return new IcebergConfig(); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperForTest.java index e8fe5a46e26..e2512948f2e 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperForTest.java @@ -34,7 +34,7 @@ public class IcebergCatalogWrapperForTest extends IcebergCatalogWrapper { public IcebergCatalogWrapperForTest() { - super(new IcebergConfig(Collections.emptyMap()), false); + super(new IcebergConfig(Collections.emptyMap())); } @Override