diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java index dbcd65f44ee..25fe9b7bbd1 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java @@ -53,5 +53,5 @@ public class IcebergConstants { public static final String GRAVITINO_ICEBERG_REST_SERVICE_NAME = "iceberg-rest"; - public static final String ICEBERG_REST_SERVICE_CATALOG_PROVIDER = "provider-impl"; + public static final String ICEBERG_REST_SERVICE_CATALOG_PROVIDER = "catalog-provider-impl"; } diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md index 981ca8fa229..30caa4c8e2b 100644 --- a/docs/iceberg-rest-service.md +++ b/docs/iceberg-rest-service.md @@ -39,20 +39,21 @@ Please refer to the following sections for details. ### REST catalog server configuration -| Configuration item | Description | Default value | Required | Since Version | -|-------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------|----------|---------------| -| `gravitino.auxService.names` | The auxiliary service name of the Gravitino Iceberg REST catalog service. Use **`iceberg-rest`**. | (none) | Yes | 0.2.0 | -| `gravitino.auxService.iceberg-rest.classpath` | The classpath of the Gravitino Iceberg REST catalog service; includes the directory containing jars and configuration. It supports both absolute and relative paths, for example, `catalogs/lakehouse-iceberg/libs, catalogs/lakehouse-iceberg/conf` | (none) | Yes | 0.2.0 | -| `gravitino.auxService.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 | +| Configuration item | Description | Default value | Required | Since Version | +|-------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------|----------|---------------| +| `gravitino.auxService.names` | The auxiliary service name of the Gravitino Iceberg REST catalog service. Use **`iceberg-rest`**. | (none) | Yes | 0.2.0 | +| `gravitino.auxService.iceberg-rest.classpath` | The classpath of the Gravitino Iceberg REST catalog service; includes the directory containing jars and configuration. It supports both absolute and relative paths, for example, `catalogs/lakehouse-iceberg/libs, catalogs/lakehouse-iceberg/conf` | (none) | Yes | 0.2.0 | +| `gravitino.auxService.iceberg-rest.host` | The host of the Gravitino Iceberg REST catalog service. | `0.0.0.0` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.httpPort` | The port of the Gravitino Iceberg REST catalog service. | `9001` | No | 0.2.0 | | `gravitino.auxService.iceberg-rest.minThreads` | The minimum number of threads in the thread pool used by the Jetty web server. `minThreads` is 8 if the value is less than 8. | `Math.max(Math.min(Runtime.getRuntime().availableProcessors() * 2, 100), 8)` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 | -| `gravitino.auxService.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 | +| `gravitino.auxService.iceberg-rest.maxThreads` | The maximum number of threads in the thread pool used by the Jetty web server. `maxThreads` is 8 if the value is less than 8, and `maxThreads` must be greater than or equal to `minThreads`. | `Math.max(Runtime.getRuntime().availableProcessors() * 4, 400)` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.threadPoolWorkQueueSize` | The size of the queue in the thread pool used by Gravitino Iceberg REST catalog service. | `100` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.stopTimeout` | The amount of time in ms for the Gravitino Iceberg REST catalog service to stop gracefully. For more information, see `org.eclipse.jetty.server.Server#setStopTimeout`. | `30000` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.idleTimeout` | The timeout in ms of idle connections. | `30000` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.requestHeaderSize` | The maximum size of an HTTP request. | `131072` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.responseHeaderSize` | The maximum size of an HTTP response. | `131072` | No | 0.2.0 | +| `gravitino.auxService.iceberg-rest.customFilters` | Comma-separated list of filter class names to apply to the APIs. | (none) | No | 0.4.0 | +| `gravitino.auxService.iceberg-rest.catalog-provider-impl` | The class name of the implement of IcebergTableOpsProvider | `org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider`| No | 0.6.0 | The filter in `customFilters` should be a standard javax servlet filter. @@ -102,6 +103,10 @@ specify a Hive or JDBC catalog backend for production environment. If you have a JDBC Iceberg catalog prior, you must set `catalog-backend-name` to keep consistent with your Jdbc Iceberg catalog name to operate the prior namespace and tables. +:::info +You can also specify catalog parameters by setting configuration entries in the style `gravitino.auxService.iceberg-rest.catalog..=`. +::: + :::caution You must download the corresponding JDBC driver to the `catalogs/lakehouse-iceberg/libs` directory. ::: diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigIcebergTableOpsProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigIcebergTableOpsProvider.java index 77c6f19838d..32c0d89a730 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigIcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/ConfigIcebergTableOpsProvider.java @@ -41,17 +41,17 @@ public class ConfigIcebergTableOpsProvider implements IcebergTableOpsProvider { public static final Logger LOG = LoggerFactory.getLogger(ConfigIcebergTableOpsProvider.class); - private IcebergConfig icebergConfig; + private Map properties; @Override - public void initialize(IcebergConfig config) { - this.icebergConfig = config; + public void initialize(Map properties) { + this.properties = properties; } @Override public IcebergTableOps getIcebergTableOps(String prefix) { if (StringUtils.isBlank(prefix)) { - return new IcebergTableOps(icebergConfig); + return new IcebergTableOps(new IcebergConfig(properties)); } if (!getCatalogs().contains(prefix)) { String errorMsg = String.format("%s can not match any catalog", prefix); @@ -63,7 +63,7 @@ public IcebergTableOps getIcebergTableOps(String prefix) { private List getCatalogs() { Map catalogs = Maps.newHashMap(); - for (String key : this.icebergConfig.getAllConfig().keySet()) { + for (String key : this.properties.keySet()) { if (!key.startsWith("catalog.")) { continue; } @@ -76,10 +76,9 @@ private List getCatalogs() { } private IcebergConfig getCatalogConfig(String catalog) { - Map base = Maps.newHashMap(this.icebergConfig.getAllConfig()); + Map base = Maps.newHashMap(this.properties); Map merge = - MapUtils.getPrefixMap( - this.icebergConfig.getAllConfig(), String.format("catalog.%s.", catalog)); + MapUtils.getPrefixMap(this.properties, String.format("catalog.%s.", catalog)); for (String key : merge.keySet()) { base.put(key, merge.get(key)); } diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java index e51e2afccc5..7416257129e 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsManager.java @@ -18,8 +18,8 @@ */ package org.apache.gravitino.iceberg.common.ops; -import com.google.common.collect.Maps; -import java.util.Map; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.commons.lang.StringUtils; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.slf4j.Logger; @@ -30,24 +30,27 @@ public class IcebergTableOpsManager implements AutoCloseable { public static final String DEFAULT_CATALOG = "default_catalog"; - private final Map icebergTableOpsMap; - + private final Cache icebergTableOpsCache; private final IcebergTableOpsProvider provider; public IcebergTableOpsManager(IcebergConfig config) { - this.icebergTableOpsMap = Maps.newConcurrentMap(); + this.icebergTableOpsCache = Caffeine.newBuilder().build(); this.provider = createProvider(config); - this.provider.initialize(config); + this.provider.initialize(config.getAllConfig()); } public IcebergTableOps getOps(String rawPrefix) { String prefix = shelling(rawPrefix); String cacheKey = prefix; + if (DEFAULT_CATALOG.equals(prefix)) { + throw new RuntimeException( + String.format("%s is conflict with reserved key, please replace it", prefix)); + } if (StringUtils.isBlank(prefix)) { LOG.debug("prefix is empty, return default iceberg catalog"); cacheKey = DEFAULT_CATALOG; } - return icebergTableOpsMap.computeIfAbsent(cacheKey, k -> provider.getIcebergTableOps(prefix)); + return icebergTableOpsCache.get(cacheKey, k -> provider.getIcebergTableOps(prefix)); } private IcebergTableOpsProvider createProvider(IcebergConfig config) { @@ -63,15 +66,16 @@ private IcebergTableOpsProvider createProvider(IcebergConfig config) { private String shelling(String rawPrefix) { if (StringUtils.isBlank(rawPrefix)) { return rawPrefix; + } else if (!rawPrefix.endsWith("/")) { + throw new RuntimeException(String.format("rawPrefix %s is illegal", rawPrefix)); } else { - return rawPrefix.replace("/", ""); + // rawPrefix is a string matching ([^/]*/) which end with / + return rawPrefix.substring(0, rawPrefix.length() - 1); } } @Override public void close() throws Exception { - for (String catalog : icebergTableOpsMap.keySet()) { - icebergTableOpsMap.get(catalog).close(); - } + icebergTableOpsCache.invalidateAll(); } } diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java index ee66222edd7..42517810b45 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergTableOpsProvider.java @@ -18,16 +18,16 @@ */ package org.apache.gravitino.iceberg.common.ops; -import org.apache.gravitino.iceberg.common.IcebergConfig; +import java.util.Map; /** - * IcebergTableOpsProvider is an interface defining how iceberg rest catalog server gets iceberg + * IcebergTableOpsProvider is an interface defining how Iceberg REST catalog server gets iceberg * catalogs. */ public interface IcebergTableOpsProvider { - /** @param config The configuration parameters for creating Provider. */ - void initialize(IcebergConfig config); + /** @param properties The configuration parameters for creating Provider. */ + void initialize(Map properties); /** * @param prefix the path param send by clients. diff --git a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java new file mode 100644 index 00000000000..a5d8392ed5f --- /dev/null +++ b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/ops/TestIcebergTableOpsManager.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.common.ops; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.commons.lang.StringUtils; +import org.apache.gravitino.iceberg.common.IcebergConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestIcebergTableOpsManager { + + @ParameterizedTest + @ValueSource(strings = {"", "hello/", "\\\n\t\\\'/", "\u0024/", "\100/", "[_~/"}) + public void testValidGetOps(String rawPrefix) { + String prefix = rawPrefix; + if (!StringUtils.isBlank(rawPrefix)) { + prefix = rawPrefix.substring(0, rawPrefix.length() - 1); + } + Map config = Maps.newHashMap(); + config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix); + IcebergTableOpsManager manager = new IcebergTableOpsManager(new IcebergConfig(config)); + + IcebergTableOps ops = manager.getOps(rawPrefix); + + if (StringUtils.isBlank(prefix)) { + Assertions.assertEquals(ops.catalog.name(), "memory"); + } else { + Assertions.assertEquals(ops.catalog.name(), prefix); + } + } + + @ParameterizedTest + @ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "default_catalog/"}) + public void testInvalidGetOps(String rawPrefix) { + IcebergTableOpsManager manager = new IcebergTableOpsManager(new IcebergConfig()); + + Assertions.assertThrowsExactly(RuntimeException.class, () -> manager.getOps(rawPrefix)); + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigIcebergTableOpsProviderForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigIcebergTableOpsProviderForTest.java index f3df77d03eb..e69c62c0219 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigIcebergTableOpsProviderForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigIcebergTableOpsProviderForTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.gravitino.iceberg.service.rest; import org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider;