Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs in Iceberg REST …
Browse files Browse the repository at this point in the history
…catalog server
  • Loading branch information
theoryxu committed Aug 1, 2024
1 parent 0b98782 commit fc2803a
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,5 @@ public class IcebergConstants {

public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = "iceberg-rest";

public static final String ICEBERG_REST_SERVICE_CATALOG_PROVIDER = "provider-impl";
public static final String ICEBERG_REST_SERVICE_CATALOG_PROVIDER = "catalog-provider-impl";
}
31 changes: 18 additions & 13 deletions docs/iceberg-rest-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,21 @@ Please refer to the following sections for details.

### REST catalog server configuration

| Configuration item | Description | Default value | Required | Since Version |
|-------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------|----------|---------------|
| `gravitino.auxService.names` | The auxiliary service name of the Gravitino Iceberg REST catalog service. Use **`iceberg-rest`**. | (none) | Yes | 0.2.0 |
| `gravitino.auxService.iceberg-rest.classpath` | The classpath of the Gravitino Iceberg REST catalog service; includes the directory containing jars and configuration. It supports both absolute and relative paths, for example, `catalogs/lakehouse-iceberg/libs, catalogs/lakehouse-iceberg/conf` | (none) | Yes | 0.2.0 |
| `gravitino.auxService.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 |
| Configuration item | Description | Default value | Required | Since Version |
|-------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------|----------|---------------|
| `gravitino.auxService.names` | The auxiliary service name of the Gravitino Iceberg REST catalog service. Use **`iceberg-rest`**. | (none) | Yes | 0.2.0 |
| `gravitino.auxService.iceberg-rest.classpath` | The classpath of the Gravitino Iceberg REST catalog service; includes the directory containing jars and configuration. It supports both absolute and relative paths, for example, `catalogs/lakehouse-iceberg/libs, catalogs/lakehouse-iceberg/conf` | (none) | Yes | 0.2.0 |
| `gravitino.auxService.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 |
| `gravitino.auxService.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 |
| `gravitino.auxService.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 |
| `gravitino.auxService.iceberg-rest.catalog-provider-impl` | The class name of the implement of IcebergTableOpsProvider | `org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider`| No | 0.6.0 |


The filter in `customFilters` should be a standard javax servlet filter.
Expand Down Expand Up @@ -102,6 +103,10 @@ specify a Hive or JDBC catalog backend for production environment.

If you have a JDBC Iceberg catalog prior, you must set `catalog-backend-name` to keep consistent with your Jdbc Iceberg catalog name to operate the prior namespace and tables.

:::info
You can also specify catalog parameters by setting configuration entries in the style `gravitino.auxService.iceberg-rest.catalog.<catalog name>.<param name>=<value>`.
:::

:::caution
You must download the corresponding JDBC driver to the `catalogs/lakehouse-iceberg/libs` directory.
:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,17 @@
public class ConfigIcebergTableOpsProvider implements IcebergTableOpsProvider {
public static final Logger LOG = LoggerFactory.getLogger(ConfigIcebergTableOpsProvider.class);

private IcebergConfig icebergConfig;
private Map<String, String> properties;

@Override
public void initialize(IcebergConfig config) {
this.icebergConfig = config;
public void initialize(Map<String, String> properties) {
this.properties = properties;
}

@Override
public IcebergTableOps getIcebergTableOps(String prefix) {
if (StringUtils.isBlank(prefix)) {
return new IcebergTableOps(icebergConfig);
return new IcebergTableOps(new IcebergConfig(properties));
}
if (!getCatalogs().contains(prefix)) {
String errorMsg = String.format("%s can not match any catalog", prefix);
Expand All @@ -63,7 +63,7 @@ public IcebergTableOps getIcebergTableOps(String prefix) {

private List<String> getCatalogs() {
Map<String, Boolean> catalogs = Maps.newHashMap();
for (String key : this.icebergConfig.getAllConfig().keySet()) {
for (String key : this.properties.keySet()) {
if (!key.startsWith("catalog.")) {
continue;
}
Expand All @@ -76,10 +76,9 @@ private List<String> getCatalogs() {
}

private IcebergConfig getCatalogConfig(String catalog) {
Map<String, String> base = Maps.newHashMap(this.icebergConfig.getAllConfig());
Map<String, String> base = Maps.newHashMap(this.properties);
Map<String, String> merge =
MapUtils.getPrefixMap(
this.icebergConfig.getAllConfig(), String.format("catalog.%s.", catalog));
MapUtils.getPrefixMap(this.properties, String.format("catalog.%s.", catalog));
for (String key : merge.keySet()) {
base.put(key, merge.get(key));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.gravitino.iceberg.common.ops;

import com.google.common.collect.Maps;
import java.util.Map;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.slf4j.Logger;
Expand All @@ -30,24 +30,27 @@ public class IcebergTableOpsManager implements AutoCloseable {

public static final String DEFAULT_CATALOG = "default_catalog";

private final Map<String, IcebergTableOps> icebergTableOpsMap;

private final Cache<String, IcebergTableOps> icebergTableOpsCache;
private final IcebergTableOpsProvider provider;

public IcebergTableOpsManager(IcebergConfig config) {
this.icebergTableOpsMap = Maps.newConcurrentMap();
this.icebergTableOpsCache = Caffeine.newBuilder().build();
this.provider = createProvider(config);
this.provider.initialize(config);
this.provider.initialize(config.getAllConfig());
}

public IcebergTableOps getOps(String rawPrefix) {
String prefix = shelling(rawPrefix);
String cacheKey = prefix;
if (DEFAULT_CATALOG.equals(prefix)) {
throw new RuntimeException(
String.format("%s is conflict with reserved key, please replace it", prefix));
}
if (StringUtils.isBlank(prefix)) {
LOG.debug("prefix is empty, return default iceberg catalog");
cacheKey = DEFAULT_CATALOG;
}
return icebergTableOpsMap.computeIfAbsent(cacheKey, k -> provider.getIcebergTableOps(prefix));
return icebergTableOpsCache.get(cacheKey, k -> provider.getIcebergTableOps(prefix));
}

private IcebergTableOpsProvider createProvider(IcebergConfig config) {
Expand All @@ -63,15 +66,16 @@ private IcebergTableOpsProvider createProvider(IcebergConfig config) {
private String shelling(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else if (!rawPrefix.endsWith("/")) {
throw new RuntimeException(String.format("rawPrefix %s is illegal", rawPrefix));
} else {
return rawPrefix.replace("/", "");
// rawPrefix is a string matching ([^/]*/) which end with /
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}

@Override
public void close() throws Exception {
for (String catalog : icebergTableOpsMap.keySet()) {
icebergTableOpsMap.get(catalog).close();
}
icebergTableOpsCache.invalidateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
*/
package org.apache.gravitino.iceberg.common.ops;

import org.apache.gravitino.iceberg.common.IcebergConfig;
import java.util.Map;

/**
* IcebergTableOpsProvider is an interface defining how iceberg rest catalog server gets iceberg
* IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets iceberg
* catalogs.
*/
public interface IcebergTableOpsProvider {

/** @param config The configuration parameters for creating Provider. */
void initialize(IcebergConfig config);
/** @param properties The configuration parameters for creating Provider. */
void initialize(Map<String, String> properties);

/**
* @param prefix the path param send by clients.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class TestIcebergTableOpsManager {

@ParameterizedTest
@ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/", "[_~/"})
public void testValidGetOps(String rawPrefix) {
String prefix = rawPrefix;
if (!StringUtils.isBlank(rawPrefix)) {
prefix = rawPrefix.substring(0, rawPrefix.length() - 1);
}
Map<String, String> config = Maps.newHashMap();
config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix);
IcebergTableOpsManager manager = new IcebergTableOpsManager(new IcebergConfig(config));

IcebergTableOps ops = manager.getOps(rawPrefix);

if (StringUtils.isBlank(prefix)) {
Assertions.assertEquals(ops.catalog.name(), "memory");
} else {
Assertions.assertEquals(ops.catalog.name(), prefix);
}
}

@ParameterizedTest
@ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "default_catalog/"})
public void testInvalidGetOps(String rawPrefix) {
IcebergTableOpsManager manager = new IcebergTableOpsManager(new IcebergConfig());

Assertions.assertThrowsExactly(RuntimeException.class, () -> manager.getOps(rawPrefix));
}
}
Loading

0 comments on commit fc2803a

Please sign in to comment.