From 2e682d2b6fba3f5e546758e49802c0da4b070ef0 Mon Sep 17 00:00:00 2001 From: theoryxu Date: Mon, 12 Aug 2024 17:50:54 +0800 Subject: [PATCH] [#4176]feat(iceberg) support multiple catalogs in Iceberg REST catalog server --- .../gravitino/config/ConfigConstants.java | 3 ++ docs/iceberg-rest-service.md | 43 +++++++++++-------- .../iceberg/common/IcebergConfig.java | 6 +-- .../ConfigBasedIcebergTableOpsProvider.java | 4 +- .../common/ops/IcebergTableOpsManager.java | 17 ++++---- .../common/ops/IcebergTableOpsProvider.java | 2 +- ...estConfigBasedIcebergTableOpsProvider.java | 10 +++++ .../ops/TestIcebergTableOpsManager.java | 9 +--- 8 files changed, 53 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java index fd6efc70f5f..5317b19a4dd 100644 --- a/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java +++ b/core/src/main/java/org/apache/gravitino/config/ConfigConstants.java @@ -59,4 +59,7 @@ private ConfigConstants() {} /** The version number for the 0.6.0 release. */ public static final String VERSION_0_6_0 = "0.6.0"; + + /** The version number for the 0.7.0 release. */ + public static final String VERSION_0_7_0 = "0.7.0"; } diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md index 115a23afab3..2ff316b52e3 100644 --- a/docs/iceberg-rest-service.md +++ b/docs/iceberg-rest-service.md @@ -62,18 +62,18 @@ Please note that, it only takes affect in `gravitino.conf`, you don't need to sp ### REST catalog server configuration -| Configuration item | Description | Default value | Required | Since Version | -|--------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------| -| `gravitino.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 | -| `gravitino.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 | -| `gravitino.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 | -| `gravitino.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 | -| `gravitino.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 | -| `gravitino.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 | -| `gravitino.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 | -| `gravitino.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 | -| `gravitino.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 | -| `gravitino.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 | +| Configuration item | Description | Default value | Required | Since Version | +|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------|----------|---------------| +| `gravitino.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 | +| `gravitino.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 | +| `gravitino.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 | +| `gravitino.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 | +| `gravitino.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 | +| `gravitino.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 | +| `gravitino.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 | +| `gravitino.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 | +| `gravitino.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 | +| `gravitino.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 | The filter in `customFilters` should be a standard javax servlet filter. @@ -148,25 +148,32 @@ You must download the corresponding JDBC driver to the `iceberg-rest-server/libs ::: #### Multi catalog support -| Configuration item | Description | Default value | Required | Since Version | -|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------| -| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.6.0 | +| Configuration item | Description | Default value | Required | Since Version | +|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------| +| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.7.0 | You can also specify catalog parameters by setting configuration entries in the style `gravitino.iceberg-rest.catalog..=`. -For example, we can configure two different catalogs named `hive_backend` and `jdbc_backend`. +Using `ConfigBasedIcebergTableOpsProvider`, the default value, could be backward compatible with configurations without catalog names. It constructs a default catalog using those configurations for clients not setting the prefix. + +For example, we can configure three different catalogs simultaneously, which one is anonymous and the others named `hive_backend` and `jdbc_backend`: ```text +gravitino.iceberg-rest.catalog-backend = jdbc +gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432 +gravitino.iceberg-rest.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-postgresql +... gravitino.iceberg-rest.catalog.hive_backend.catalog-backend = hive -gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9083 +gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9084 gravitino.iceberg-rest.catalog.hive_backend.warehouse = /user/hive/warehouse-hive/ ... gravitino.iceberg-rest.catalog.jdbc_backend.catalog-backend = jdbc gravitino.iceberg-rest.catalog.jdbc_backend.uri = jdbc:mysql://127.0.0.1:3306/ -gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-jdbc +gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-mysql ... ``` + ### Other Apache Iceberg catalog properties You can add other properties defined in [Iceberg catalog properties](https://iceberg.apache.org/docs/1.5.2/configuration/#catalog-properties). diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index d250178c645..362ecde68f8 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -161,11 +161,11 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_REST_CATALOG_PROVIDER = new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER) .doc( - "The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets iceberg catalogs.") - .version(ConfigConstants.VERSION_0_6_0) + "The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs.") + .version(ConfigConstants.VERSION_0_7_0) .stringConf() .createWithDefault( - "org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider"); + "org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider"); public String getJdbcDriver() { return get(JDBC_DRIVER); diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java index a68a91c5ac4..edcef17b4fe 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigBasedIcebergTableOpsProvider.java @@ -21,7 +21,6 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.utils.MapUtils; @@ -49,7 +48,8 @@ public void initialize(Map properties) { this.catalogConfigs = properties.keySet().stream() .map(this::getCatalogName) - .flatMap(op -> op.map(Stream::of).orElseGet(Stream::empty)) + .filter(Optional::isPresent) + .map(Optional::get) .distinct() .collect( Collectors.toMap( diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java index 46ce08c4689..ad93070d7f7 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java @@ -20,9 +20,11 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.base.Preconditions; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.iceberg.common.IcebergConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,10 +52,9 @@ public IcebergTableOps getOps(String rawPrefix) { private String getCatalogName(String rawPrefix) { String prefix = shelling(rawPrefix); - if (IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix)) { - throw new RuntimeException( - String.format("%s is conflict with reserved key, please replace it", prefix)); - } + Preconditions.checkArgument( + !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix), + String.format("%s is conflict with reserved key, please replace it", prefix)); if (StringUtils.isBlank(prefix)) { return IcebergConstants.GRAVITINO_DEFAULT_CATALOG; } @@ -63,9 +64,7 @@ private String getCatalogName(String rawPrefix) { private IcebergTableOpsProvider createProvider(Map properties) { try { String className = - properties.getOrDefault( - IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER, - ConfigBasedIcebergTableOpsProvider.class.getName()); + (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER); Class providerClz = Class.forName(className); return (IcebergTableOpsProvider) providerClz.getDeclaredConstructor().newInstance(); } catch (Exception e) { @@ -76,10 +75,10 @@ private IcebergTableOpsProvider createProvider(Map properties) { private String shelling(String rawPrefix) { if (StringUtils.isBlank(rawPrefix)) { return rawPrefix; - } else if (!rawPrefix.endsWith("/")) { - throw new RuntimeException(String.format("rawPrefix %s is illegal", rawPrefix)); } else { // rawPrefix is a string matching ([^/]*/) which end with / + Preconditions.checkArgument( + rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); return rawPrefix.substring(0, rawPrefix.length() - 1); } } diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java index 011081196cf..cda5ac20ae8 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java @@ -21,7 +21,7 @@ import java.util.Map; /** - * IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets iceberg + * IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets Iceberg * catalogs. */ public interface IcebergTableOpsProvider { diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java index 17c1cd8eab2..a762af7332e 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestConfigBasedIcebergTableOpsProvider.java @@ -22,6 +22,9 @@ import java.util.Map; import java.util.UUID; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.jdbc.JdbcCatalog; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -61,6 +64,13 @@ public void testValidIcebergTableOps(String catalogName) { IcebergTableOps ops = provider.getIcebergTableOps(catalogName); Assertions.assertEquals(catalogName, ops.catalog.name()); + if ("hive_backend".equals(catalogName)) { + Assertions.assertTrue(ops.catalog instanceof HiveCatalog); + } else if ("jdbc_backend".equals(catalogName)) { + Assertions.assertTrue(ops.catalog instanceof JdbcCatalog); + } else if (IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName)) { + Assertions.assertTrue(ops.catalog instanceof InMemoryCatalog); + } } @ParameterizedTest diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java index c35d6db35e4..04ded71dd7f 100644 --- a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java @@ -21,7 +21,6 @@ import com.google.common.collect.Maps; import java.util.Map; import org.apache.commons.lang.StringUtils; -import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -39,9 +38,6 @@ public void testValidGetOps(String rawPrefix) { } Map config = Maps.newHashMap(); config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix); - config.put( - IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER, - ConfigBasedIcebergTableOpsProvider.class.getName()); IcebergTableOpsManager manager = new IcebergTableOpsManager(config); IcebergTableOps ops = manager.getOps(rawPrefix); @@ -58,11 +54,8 @@ public void testValidGetOps(String rawPrefix) { strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "__gravitino_default_catalog/"}) public void testInvalidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); - config.put( - IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER, - ConfigBasedIcebergTableOpsProvider.class.getName()); IcebergTableOpsManager manager = new IcebergTableOpsManager(config); - Assertions.assertThrowsExactly(RuntimeException.class, () -> manager.getOps(rawPrefix)); + Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> manager.getOps(rawPrefix)); } }