Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs in Iceberg REST …
Browse files Browse the repository at this point in the history
…catalog server
  • Loading branch information
theoryxu committed Aug 12, 2024
1 parent 84db02e commit 2e682d2
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ private ConfigConstants() {}

/** The version number for the 0.6.0 release. */
public static final String VERSION_0_6_0 = "0.6.0";

/** The version number for the 0.7.0 release. */
public static final String VERSION_0_7_0 = "0.7.0";
}
43 changes: 25 additions & 18 deletions docs/iceberg-rest-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ Please note that, it only takes affect in `gravitino.conf`, you don't need to sp

### REST catalog server configuration

| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------|
| `gravitino.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 |
| `gravitino.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 |
| `gravitino.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 |
| `gravitino.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 |
| `gravitino.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 |
| `gravitino.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 |
| `gravitino.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 |
| `gravitino.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 |
| `gravitino.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 |
| `gravitino.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 |
| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------|----------|---------------|
| `gravitino.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 |
| `gravitino.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 |
| `gravitino.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 |
| `gravitino.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 |
| `gravitino.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 |
| `gravitino.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 |
| `gravitino.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 |
| `gravitino.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 |
| `gravitino.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 |
| `gravitino.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 |


The filter in `customFilters` should be a standard javax servlet filter.
Expand Down Expand Up @@ -148,25 +148,32 @@ You must download the corresponding JDBC driver to the `iceberg-rest-server/libs
:::

#### Multi catalog support
| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------|
| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.6.0 |
| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------|
| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.7.0 |

You can also specify catalog parameters by setting configuration entries in the style `gravitino.iceberg-rest.catalog.<catalog name>.<param name>=<value>`.

For example, we can configure two different catalogs named `hive_backend` and `jdbc_backend`.
Using `ConfigBasedIcebergTableOpsProvider`, the default value, could be backward compatible with configurations without catalog names. It constructs a default catalog using those configurations for clients not setting the prefix.

For example, we can configure three different catalogs simultaneously, which one is anonymous and the others named `hive_backend` and `jdbc_backend`:

```text
gravitino.iceberg-rest.catalog-backend = jdbc
gravitino.iceberg-rest.uri = jdbc:postgresql://127.0.0.1:5432
gravitino.iceberg-rest.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-postgresql
...
gravitino.iceberg-rest.catalog.hive_backend.catalog-backend = hive
gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9083
gravitino.iceberg-rest.catalog.hive_backend.uri = thrift://127.0.0.1:9084
gravitino.iceberg-rest.catalog.hive_backend.warehouse = /user/hive/warehouse-hive/
...
gravitino.iceberg-rest.catalog.jdbc_backend.catalog-backend = jdbc
gravitino.iceberg-rest.catalog.jdbc_backend.uri = jdbc:mysql://127.0.0.1:3306/
gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-jdbc
gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/user/hive/warehouse-mysql
...
```


### Other Apache Iceberg catalog properties

You can add other properties defined in [Iceberg catalog properties](https://iceberg.apache.org/docs/1.5.2/configuration/#catalog-properties).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
.doc(
"The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets iceberg catalogs.")
.version(ConfigConstants.VERSION_0_6_0)
"The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault(
"org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider");
"org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider");

public String getJdbcDriver() {
return get(JDBC_DRIVER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.utils.MapUtils;
Expand Down Expand Up @@ -49,7 +48,8 @@ public void initialize(Map<String, String> properties) {
this.catalogConfigs =
properties.keySet().stream()
.map(this::getCatalogName)
.flatMap(op -> op.map(Stream::of).orElseGet(Stream::empty))
.filter(Optional::isPresent)
.map(Optional::get)
.distinct()
.collect(
Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -50,10 +52,9 @@ public IcebergTableOps getOps(String rawPrefix) {

private String getCatalogName(String rawPrefix) {
String prefix = shelling(rawPrefix);
if (IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix)) {
throw new RuntimeException(
String.format("%s is conflict with reserved key, please replace it", prefix));
}
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
String.format("%s is conflict with reserved key, please replace it", prefix));
if (StringUtils.isBlank(prefix)) {
return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
}
Expand All @@ -63,9 +64,7 @@ private String getCatalogName(String rawPrefix) {
private IcebergTableOpsProvider createProvider(Map<String, String> properties) {
try {
String className =
properties.getOrDefault(
IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
ConfigBasedIcebergTableOpsProvider.class.getName());
(new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER);
Class<?> providerClz = Class.forName(className);
return (IcebergTableOpsProvider) providerClz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
Expand All @@ -76,10 +75,10 @@ private IcebergTableOpsProvider createProvider(Map<String, String> properties) {
private String shelling(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else if (!rawPrefix.endsWith("/")) {
throw new RuntimeException(String.format("rawPrefix %s is illegal", rawPrefix));
} else {
// rawPrefix is a string matching ([^/]*/) which end with /
Preconditions.checkArgument(
rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Map;

/**
* IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets iceberg
* IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets Iceberg
* catalogs.
*/
public interface IcebergTableOpsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Map;
import java.util.UUID;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -61,6 +64,13 @@ public void testValidIcebergTableOps(String catalogName) {
IcebergTableOps ops = provider.getIcebergTableOps(catalogName);

Assertions.assertEquals(catalogName, ops.catalog.name());
if ("hive_backend".equals(catalogName)) {
Assertions.assertTrue(ops.catalog instanceof HiveCatalog);
} else if ("jdbc_backend".equals(catalogName)) {
Assertions.assertTrue(ops.catalog instanceof JdbcCatalog);
} else if (IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName)) {
Assertions.assertTrue(ops.catalog instanceof InMemoryCatalog);
}
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -39,9 +38,6 @@ public void testValidGetOps(String rawPrefix) {
}
Map<String, String> config = Maps.newHashMap();
config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix);
config.put(
IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
ConfigBasedIcebergTableOpsProvider.class.getName());
IcebergTableOpsManager manager = new IcebergTableOpsManager(config);

IcebergTableOps ops = manager.getOps(rawPrefix);
Expand All @@ -58,11 +54,8 @@ public void testValidGetOps(String rawPrefix) {
strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "__gravitino_default_catalog/"})
public void testInvalidGetOps(String rawPrefix) {
Map<String, String> config = Maps.newHashMap();
config.put(
IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
ConfigBasedIcebergTableOpsProvider.class.getName());
IcebergTableOpsManager manager = new IcebergTableOpsManager(config);

Assertions.assertThrowsExactly(RuntimeException.class, () -> manager.getOps(rawPrefix));
Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> manager.getOps(rawPrefix));
}
}

0 comments on commit 2e682d2

Please sign in to comment.