Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs in Iceberg REST …
Browse files Browse the repository at this point in the history
…catalog server
  • Loading branch information
theoryxu committed Aug 13, 2024
1 parent 58be2d7 commit c3c5cfb
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class IcebergConstants {

public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = "iceberg-rest";

public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider-impl";
public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider";

public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog";
}
14 changes: 7 additions & 7 deletions docs/iceberg-rest-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ You must download the corresponding JDBC driver to the `iceberg-rest-server/libs

#### Multi catalog support

Gravitino Iceberg REST server supports multiple catalogs. Users can enable it by setting configuration entries in the style `gravitino.iceberg-rest.catalog.<catalog name>.<param name>=<value>`.
The Gravitino Iceberg REST server supports multiple catalogs and offers a configuration-based catalog management system.

| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------|------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------|----------|---------------|
| `gravitino.iceberg-rest.catalog-provider-impl` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider` | No | 0.7.0 |
| Configuration item | Description | Default value | Required | Since Version |
|----------------------------------------------|------------------------------------------------------------------------------------------------------------------|-----------------------------|----------|---------------|
| `gravitino.iceberg-rest.catalog-provider` | The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs. | `config-based-provider` | No | 0.7.0 |

Using `ConfigBasedIcebergTableOpsProvider`, the default value, could be backward compatible with configurations without catalog names. It constructs a default catalog using those configurations for clients not setting `prefix`.
When using a config-based catalog provider, you can configure the default catalog with gravitino.iceberg-rest.catalog.<param name>=<value>. For specific catalogs, use the format gravitino.iceberg-rest.catalog.<catalog name>.<param name>=<value>.

For instance, we can configure three different catalogs, which one is anonymous and the others named `hive_backend` and `jdbc_backend`:
For instance, you can configure three different catalogs, the default catalog and the specific hive_backend and jdbc_backend catalogs separately.

```text
gravitino.iceberg-rest.catalog-backend = jdbc
Expand All @@ -174,7 +174,7 @@ gravitino.iceberg-rest.catalog.jdbc_backend.warehouse = hdfs://127.0.0.1:9000/us
...
```

Then we can set `prefix` in client config to use those catalogs. Take Spark SQL as an example:
You can access different catalogs by setting the prefix to the specific catalog name in the Iceberg REST client configuration. The default catalog will be used if you do not specify a prefix. For instance, consider the case of SparkSQL.

```shell
./bin/spark-sql -v \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
"The implementation of IcebergTableOpsProvider defines how the Iceberg REST catalog server gets Iceberg catalogs.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault(
"org.apache.gravitino.iceberg.common.ops.ConfigBasedIcebergTableOpsProvider");
.createWithDefault("config-based-provider");

public String getJdbcDriver() {
return get(JDBC_DRIVER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ConfigBasedIcebergTableOpsProvider implements IcebergTableOpsProvid
public static final Logger LOG =
LoggerFactory.getLogger(ConfigBasedIcebergTableOpsProvider.class);

public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = "config-based-provider";

@VisibleForTesting Map<String, IcebergConfig> catalogConfigs;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
Expand All @@ -31,8 +33,14 @@
public class IcebergTableOpsManager implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(IcebergTableOpsManager.class);

private static final ImmutableMap<String, String> ICEBERG_TABLE_OPS_PROVIDER_NAMES =
ImmutableMap.of(
ConfigBasedIcebergTableOpsProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME,
ConfigBasedIcebergTableOpsProvider.class.getCanonicalName());

private final Cache<String, IcebergTableOps> icebergTableOpsCache;
private final IcebergTableOpsProvider provider;

private IcebergTableOpsProvider provider;

public IcebergTableOpsManager(Map<String, String> properties) {
this.icebergTableOpsCache = Caffeine.newBuilder().build();
Expand All @@ -50,6 +58,11 @@ public IcebergTableOps getOps(String rawPrefix) {
return icebergTableOpsCache.get(catalogName, k -> provider.getIcebergTableOps(catalogName));
}

@VisibleForTesting
public void setIcebergTableOpsProvider(IcebergTableOpsProvider provider) {
this.provider = provider;
}

private String getCatalogName(String rawPrefix) {
String prefix = shelling(rawPrefix);
Preconditions.checkArgument(
Expand All @@ -62,9 +75,15 @@ private String getCatalogName(String rawPrefix) {
}

private IcebergTableOpsProvider createProvider(Map<String, String> properties) {
String providerName =
(new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER);
String className = ICEBERG_TABLE_OPS_PROVIDER_NAMES.get(providerName);

Preconditions.checkArgument(
StringUtils.isNotEmpty(className), String.format("%s can not match any provider", providerName));
LOG.info("Load Iceberg catalog provider: {}.", className);

try {
String className =
(new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_PROVIDER);
Class<?> providerClz = Class.forName(className);
return (IcebergTableOpsProvider) providerClz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
Expand Down Expand Up @@ -71,10 +70,9 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
if (bindIcebergTableOps) {
Map<String, String> catalogConf = Maps.newHashMap();
catalogConf.put(String.format("catalog.%s.catalog-backend-name", PREFIX), PREFIX);
catalogConf.put(
IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER,
ConfigBasedIcebergTableOpsProviderForTest.class.getName());
IcebergTableOpsManager icebergTableOpsManager = new IcebergTableOpsManager(catalogConf);
icebergTableOpsManager.setIcebergTableOpsProvider(
new ConfigBasedIcebergTableOpsProviderForTest());

IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
Expand Down

0 comments on commit c3c5cfb

Please sign in to comment.