From c3c5cfb4cbf5c9fc43018315f6ea1352579815b3 Mon Sep 17 00:00:00 2001 From: theoryxu Date: Tue, 13 Aug 2024 15:54:03 +0800 Subject: [PATCH] [#4176]feat(iceberg) support multiple catalogs in Iceberg REST catalog server --- .../lakehouse/iceberg/IcebergConstants.java | 2 +- docs/iceberg-rest-service.md | 14 +++++------ .../iceberg/common/IcebergConfig.java | 3 +-- .../ConfigBasedIcebergTableOpsProvider.java | 2 ++ .../common/ops/IcebergTableOpsManager.java | 25 ++++++++++++++++--- .../service/rest/IcebergRestTestUtil.java | 6 ++--- 6 files changed, 35 insertions(+), 17 deletions(-) diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java index 242c38e8569..6e4aae37ab1 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java @@ -64,7 +64,7 @@ public class IcebergConstants { public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = "iceberg-rest"; - public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider-impl"; + public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider"; public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog"; } diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md index 1b1f0888ee9..21c4293a434 100644 --- a/docs/iceberg-rest-service.md +++ b/docs/iceberg-rest-service.md @@ -149,15 +149,15 @@ You must download the corresponding JDBC driver to the `iceberg-rest-server/libs #### Multi catalog support -Gravitino Iceberg REST server supports multiple catalogs. Users can enable it by setting configuration entries in the style `gravitino.iceberg-rest.catalog..=`. +The Gravitino Iceberg REST server supports multiple catalogs and offers a configuration-based catalog management system. -| Configuration item | Description | Default value | Required | Since Version | -|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------| -| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.7.0 | +| Configuration item | Description | Default value | Required | Since Version | +|----------------------------------------------|------------------------------------------------------------------------------------------------------------------|-----------------------------|----------|---------------| +| `gravitino.iceberg-rest.catalog-provider` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `config-based-provider` | No | 0.7.0 | -Using `ConfigBasedIcebergTableOpsProvider`, the default value, could be backward compatible with configurations without catalog names. It constructs a default catalog using those configurations for clients not setting `prefix`. +When using a config-based catalog provider, you can configure the default catalog with gravitino.iceberg-rest.catalog.=. For specific catalogs, use the format gravitino.iceberg-rest.catalog..=. -For instance, we can configure three different catalogs, which one is anonymous and the others named `hive_backend` and `jdbc_backend`: +For instance, you can configure three different catalogs, the default catalog and the specific hive_backend and jdbc_backend catalogs separately. ```text gravitino.iceberg-rest.catalog-backend = jdbc @@ -174,7 +174,7 @@ gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/us ... ``` -Then we can set `prefix` in client config to use those catalogs. Take Spark SQL as an example: +You can access different catalogs by setting the prefix to the specific catalog name in the Iceberg REST client configuration. The default catalog will be used if you do not specify a prefix. For instance, consider the case of SparkSQL. ```shell ./bin/spark-sql -v \ diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 362ecde68f8..abfc04d5fc5 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -164,8 +164,7 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { "The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() - .createWithDefault( - "org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider"); + .createWithDefault("config-based-provider"); public String getJdbcDriver() { return get(JDBC_DRIVER); diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java index c2bbe58a86f..070e67ce1cb 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java @@ -42,6 +42,8 @@ public class ConfigBasedIcebergTableOpsProvider implements IcebergTableOpsProvid public static final Logger LOG = LoggerFactory.getLogger(ConfigBasedIcebergTableOpsProvider.class); + public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = "config-based-provider"; + @VisibleForTesting Map catalogConfigs; @Override diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java index ad93070d7f7..2c0d8bff443 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java @@ -20,7 +20,9 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; @@ -31,8 +33,14 @@ public class IcebergTableOpsManager implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergTableOpsManager.class); + private static final ImmutableMap ICEBERG_TABLE_OPS_PROVIDER_NAMES = + ImmutableMap.of( + ConfigBasedIcebergTableOpsProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME, + ConfigBasedIcebergTableOpsProvider.class.getCanonicalName()); + private final Cache icebergTableOpsCache; - private final IcebergTableOpsProvider provider; + + private IcebergTableOpsProvider provider; public IcebergTableOpsManager(Map properties) { this.icebergTableOpsCache = Caffeine.newBuilder().build(); @@ -50,6 +58,11 @@ public IcebergTableOps getOps(String rawPrefix) { return icebergTableOpsCache.get(catalogName, k -> provider.getIcebergTableOps(catalogName)); } + @VisibleForTesting + public void setIcebergTableOpsProvider(IcebergTableOpsProvider provider) { + this.provider = provider; + } + private String getCatalogName(String rawPrefix) { String prefix = shelling(rawPrefix); Preconditions.checkArgument( @@ -62,9 +75,15 @@ private String getCatalogName(String rawPrefix) { } private IcebergTableOpsProvider createProvider(Map properties) { + String providerName = + (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER); + String className = ICEBERG_TABLE_OPS_PROVIDER_NAMES.get(providerName); + + Preconditions.checkArgument( + StringUtils.isNotEmpty(className), String.format("%s can not match any provider", providerName)); + LOG.info("Load Iceberg catalog provider: {}.", className); + try { - String className = - (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER); Class providerClz = Class.forName(className); return (IcebergTableOpsProvider) providerClz.getDeclaredConstructor().newInstance(); } catch (Exception e) { diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index e7b19e0aff5..8109bfd3c05 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; @@ -71,10 +70,9 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe if (bindIcebergTableOps) { Map catalogConf = Maps.newHashMap(); catalogConf.put(String.format("catalog.%s.catalog-backend-name", PREFIX), PREFIX); - catalogConf.put( - IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER, - ConfigBasedIcebergTableOpsProviderForTest.class.getName()); IcebergTableOpsManager icebergTableOpsManager = new IcebergTableOpsManager(catalogConf); + icebergTableOpsManager.setIcebergTableOpsProvider( + new ConfigBasedIcebergTableOpsProviderForTest()); IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register(