Skip to content

Commit

Permalink
[#4581]feat(iceberg): support Gravitino-based multi catalog support f…
Browse files Browse the repository at this point in the history
…or Gravitino Iceberg REST server (#4598)

### What changes were proposed in this pull request?

support Gravitino-based multi catalog support for Gravitino Iceberg REST
server

### Why are the changes needed?

fixes: #4581

### Does this PR introduce _any_ user-facing change?

add a property key

### How was this patch tested?

1. add UT
2. manual test

---------

Co-authored-by: theoryxu <[email protected]>
  • Loading branch information
theoryxu and theoryxu authored Aug 25, 2024
1 parent e997ad2 commit 1ec5b11
Show file tree
Hide file tree
Showing 16 changed files with 338 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ public class IcebergConstants {

public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = "iceberg-rest";

public static final String ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL =
"catalog-cache-eviction-interval-ms";

public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider";

public static final String GRAVITINO_URI = "gravitino-uri";

public static final String GRAVITINO_METALAKE = "gravitino-metalake";

public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog";
}
21 changes: 20 additions & 1 deletion docs/iceberg-rest-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ The Gravitino Iceberg REST server supports multiple catalogs and offers a config
|----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------|----------|---------------|
| `gravitino.iceberg-rest.catalog-provider` | Catalog provider class name, you can develop a class that implements `IcebergTableOpsProvider` and add the corresponding jar file to the Iceberg REST service classpath directory. | `config-based-provider` | No | 0.7.0 |

When using a config-based catalog provider, you can configure the default catalog with `gravitino.iceberg-rest.catalog.<param name>=<value>`. For specific catalogs, use the format `gravitino.iceberg-rest.catalog.<catalog name>.<param name>=<value>`.
##### Configuration based catalog provider

When using a configuration based catalog provider, you can configure the default catalog with `gravitino.iceberg-rest.catalog.<param name>=<value>`. For specific catalogs, use the format `gravitino.iceberg-rest.catalog.<catalog name>.<param name>=<value>`.

For instance, you can configure three different catalogs, the default catalog and the specific `hive_backend` and `jdbc_backend` catalogs separately.

Expand Down Expand Up @@ -215,6 +217,23 @@ You can access different catalogs by setting the `prefix` to the specific catalo
...
```

##### Gravitino server based catalog provider

When using a Gravitino server based catalog provider, you can leverage Gravitino to support dynamic catalog management for the Iceberg REST server.

| Configuration item | Description | Default value | Required | Since Version |
|--------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
| `gravitino.iceberg-rest.gravitino-uri` | The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`. | (none) | No | 0.7.0 |
| `gravitino.iceberg-rest.gravitino-metalake` | The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`. | (none) | No | 0.7.0 |
| `gravitino.iceberg-rest.catalog-cache-eviction-interval-ms` | Catalog cache eviction interval. | 3600000 | No | 0.7.0 |

```text
gravitino.iceberg-rest.catalog-cache-eviction-interval-ms = 300000
gravitino.iceberg-rest.catalog-provider = gravitino-based-provider
gravitino.iceberg-rest.gravitino-uri = http://127.0.0.1:8090
gravitino.iceberg-rest.gravitino-metalake = test
```

### Other Apache Iceberg catalog properties

You can add other properties defined in [Iceberg catalog properties](https://iceberg.apache.org/docs/1.5.2/configuration/#catalog-properties).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.stringConf()
.create();

public static final ConfigEntry<Long> ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL)
.doc("Catalog cache eviction interval.")
.version(ConfigConstants.VERSION_0_7_0)
.longConf()
.createWithDefault(3600000L);

public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
.doc(
Expand All @@ -174,6 +181,22 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.stringConf()
.createWithDefault("config-based-provider");

public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
.doc(
"The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public static final ConfigEntry<String> GRAVITINO_METALAKE =
new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE)
.doc(
"The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public String getJdbcDriver() {
return get(JDBC_DRIVER);
}
Expand Down
4 changes: 4 additions & 0 deletions iceberg/iceberg-rest-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ val icebergVersion: String = libs.versions.iceberg.get()
val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get()

dependencies {
implementation(project(":api"))
implementation(project(":catalogs:catalog-common"))
implementation(project(":clients:client-java"))
implementation(project(":core"))
implementation(project(":common"))
implementation(project(":iceberg:iceberg-common"))
Expand All @@ -40,6 +42,8 @@ dependencies {
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
implementation(libs.bundles.log4j)
implementation(libs.caffeine)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.jackson.annotations)
implementation(libs.jackson.databind)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.auxiliary.GravitinoAuxiliaryService;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;
package org.apache.gravitino.iceberg.provider;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsProvider;
import org.apache.gravitino.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.provider;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This provider proxy Gravitino lakehouse-iceberg catalogs.
*
* <p>For example, there are one catalog named iceberg_catalog in metalake
*
* <p>The catalogName is iceberg_catalog
*/
public class GravitinoBasedIcebergTableOpsProvider
implements IcebergTableOpsProvider, AutoCloseable {
public static final Logger LOG =
LoggerFactory.getLogger(GravitinoBasedIcebergTableOpsProvider.class);

public static final String GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME =
"gravitino-based-provider";

private String gravitinoMetalake;

private GravitinoAdminClient client;

@Override
public void initialize(Map<String, String> properties) {
String uri = properties.get(IcebergConstants.GRAVITINO_URI);
String metalake = properties.get(IcebergConstants.GRAVITINO_METALAKE);

Preconditions.checkArgument(
StringUtils.isNotBlank(uri), IcebergConstants.GRAVITINO_URI + " is blank");
Preconditions.checkArgument(
StringUtils.isNotBlank(metalake), IcebergConstants.GRAVITINO_METALAKE + " is blank");

this.gravitinoMetalake = metalake;
this.client = GravitinoAdminClient.builder(uri).build();
}

@Override
public IcebergTableOps getIcebergTableOps(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName),
IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider");

Catalog catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName);

Preconditions.checkArgument(
"lakehouse-iceberg".equals(catalog.provider()),
String.format("%s.%s is not iceberg catalog", gravitinoMetalake, catalogName));

Map<String, String> properties =
IcebergPropertiesUtils.toIcebergCatalogProperties(catalog.properties());
return new IcebergTableOps(new IcebergConfig(properties));
}

@VisibleForTesting
void setClient(GravitinoAdminClient client) {
this.client = client;
}

@Override
public void close() throws Exception {
if (client != null) {
client.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;
package org.apache.gravitino.iceberg.service;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsProvider;
import org.apache.gravitino.iceberg.provider.ConfigBasedIcebergTableOpsProvider;
import org.apache.gravitino.iceberg.provider.GravitinoBasedIcebergTableOpsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,16 +43,37 @@ public class IcebergTableOpsManager implements AutoCloseable {
private static final ImmutableMap<String, String> ICEBERG_TABLE_OPS_PROVIDER_NAMES =
ImmutableMap.of(
ConfigBasedIcebergTableOpsProvider.CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME,
ConfigBasedIcebergTableOpsProvider.class.getCanonicalName());
ConfigBasedIcebergTableOpsProvider.class.getCanonicalName(),
GravitinoBasedIcebergTableOpsProvider.GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME,
GravitinoBasedIcebergTableOpsProvider.class.getCanonicalName());

private final Cache<String, IcebergTableOps> icebergTableOpsCache;

private final IcebergTableOpsProvider provider;

public IcebergTableOpsManager(Map<String, String> properties) {
this.icebergTableOpsCache = Caffeine.newBuilder().build();
this.provider = createProvider(properties);
this.provider.initialize(properties);
this.icebergTableOpsCache =
Caffeine.newBuilder()
.expireAfterWrite(
(new IcebergConfig(properties))
.get(IcebergConfig.ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL),
TimeUnit.MILLISECONDS)
.removalListener(
(k, v, c) -> {
LOG.info("Remove IcebergTableOps cache {}.", k);
closeIcebergTableOps((IcebergTableOps) v);
})
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("table-ops-cleaner-%d")
.build())))
.build();
}

/**
Expand Down Expand Up @@ -97,8 +126,19 @@ private String shelling(String rawPrefix) {
}
}

private void closeIcebergTableOps(IcebergTableOps ops) {
try {
ops.close();
} catch (Exception ex) {
LOG.warn("Close Iceberg table ops fail: {}, {}", ops, ex);
}
}

@Override
public void close() throws Exception {
icebergTableOpsCache.invalidateAll();
if (provider instanceof AutoCloseable) {
((AutoCloseable) provider).close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.iceberg.service.IcebergTableOpsManager;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.RESTUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.iceberg.service.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.iceberg.service.IcebergTableOpsManager;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.rest.requests.RenameTableRequest;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;
package org.apache.gravitino.iceberg.provider;

import com.google.common.collect.Maps;
import java.util.Map;
import java.util.UUID;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOps;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.jdbc.JdbcCatalog;
Expand Down Expand Up @@ -94,13 +95,13 @@ public void testValidIcebergTableOps() {
Assertions.assertEquals("memory", defaultIcebergConfig.get(IcebergConfig.CATALOG_BACKEND));
Assertions.assertEquals("/tmp/", defaultIcebergConfig.get(IcebergConfig.CATALOG_WAREHOUSE));

Assertions.assertEquals(hiveCatalogName, hiveOps.catalog.name());
Assertions.assertEquals(jdbcCatalogName, jdbcOps.catalog.name());
Assertions.assertEquals(defaultCatalogName, defaultOps.catalog.name());
Assertions.assertEquals(hiveCatalogName, hiveOps.getCatalog().name());
Assertions.assertEquals(jdbcCatalogName, jdbcOps.getCatalog().name());
Assertions.assertEquals(defaultCatalogName, defaultOps.getCatalog().name());

Assertions.assertTrue(hiveOps.catalog instanceof HiveCatalog);
Assertions.assertTrue(jdbcOps.catalog instanceof JdbcCatalog);
Assertions.assertTrue(defaultOps.catalog instanceof InMemoryCatalog);
Assertions.assertTrue(hiveOps.getCatalog() instanceof HiveCatalog);
Assertions.assertTrue(jdbcOps.getCatalog() instanceof JdbcCatalog);
Assertions.assertTrue(defaultOps.getCatalog() instanceof InMemoryCatalog);
}

@ParameterizedTest
Expand Down
Loading

0 comments on commit 1ec5b11

Please sign in to comment.