From f5beda919bfef4f3adf910c0ae863ad85722cf1e Mon Sep 17 00:00:00 2001 From: fanng Date: Sat, 12 Oct 2024 16:52:21 +0800 Subject: [PATCH 1/9] support iceberg pre event listener --- bundles/build.gradle.kts | 2 +- .../lakehouse/iceberg/IcebergConstants.java | 3 +- .../org/apache/gravitino/GravitinoEnv.java | 35 ++++++--- .../apache/gravitino/iceberg/RESTService.java | 10 +++ .../DynamicIcebergCatalogConfigProvider.java | 4 +- .../StaticIcebergCatalogConfigProvider.java | 2 +- .../server/GravitinoIcebergRESTServer.java | 3 +- .../service/IcebergCatalogWrapperManager.java | 10 --- .../service/IcebergExceptionMapper.java | 3 + .../iceberg/service/IcebergRestUtils.java | 55 +++++++++++++ .../IcebergTableEventDispatcher.java | 77 ++++++++++++++++++ .../IcebergTableOperationDispatcher.java | 41 ++++++++++ .../IcebergTableOperationExecutor.java | 42 ++++++++++ .../service/rest/IcebergTableOperations.java | 20 +++-- .../api/event/IcebergCreateTableEvent.java | 54 +++++++++++++ .../event/IcebergCreateTableFailureEvent.java | 31 ++++++++ .../api/event/IcebergCreateTablePreEvent.java | 40 ++++++++++ .../listener/api/event/IcebergEvent.java | 31 ++++++++ .../listener/api/event/IcebergPreEvent.java | 31 ++++++++ .../api/event/IcebergRESTFailureEvent.java | 31 ++++++++ .../listener/api/event/IcebergTableEvent.java | 29 +++++++ .../api/event/IcebergTableFailureEvent.java | 31 ++++++++ .../api/event/IcebergTablePreEvent.java | 31 ++++++++ ...tDynamicIcebergCatalogWrapperProvider.java | 2 +- ...stStaticIcebergCatalogWrapperProvider.java | 2 +- .../TestIcebergCatalogWrapperManager.java | 3 +- .../iceberg/service/TestIcebergRESTUtils.java | 78 +++++++++++++++++++ .../service/rest/IcebergRestTestUtil.java | 12 +++ .../gravitino/server/GravitinoServer.java | 2 +- 29 files changed, 676 insertions(+), 39 deletions(-) create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java create mode 100644 iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java diff --git a/bundles/build.gradle.kts b/bundles/build.gradle.kts index 043fbfec673..fa6eb7d5ef9 100644 --- a/bundles/build.gradle.kts +++ b/bundles/build.gradle.kts @@ -19,4 +19,4 @@ tasks.all { enabled = false -} \ No newline at end of file +} diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java index 004bde0bd7e..52e665579da 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java @@ -77,5 +77,6 @@ public class IcebergConstants { public static final String GRAVITINO_METALAKE = "gravitino-metalake"; - public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog"; + public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino"; + public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog"; } diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index 77d76b47476..64dcb95d6f7 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -78,6 +78,8 @@ public class GravitinoEnv { private static final Logger LOG = LoggerFactory.getLogger(GravitinoEnv.class); private Config config; + // Iceberg REST server use base components while Gravitino Server use full components. + private boolean manageFullComponents = true; private EntityStore entityStore; @@ -130,21 +132,30 @@ public static GravitinoEnv getInstance() { } /** - * Initialize the Gravitino environment. + * Initialize base components, used for Iceberg REST server. * * @param config The configuration object to initialize the environment. - * @param isGravitinoServer A boolean flag indicating whether the initialization is for the - * Gravitino server. If true, server-specific components will be initialized in addition to - * the base components. */ - public void initialize(Config config, boolean isGravitinoServer) { - LOG.info("Initializing Gravitino Environment..."); + public void initializeBaseComponents(Config config) { + LOG.info("Initializing Gravitino base environment..."); this.config = config; + this.manageFullComponents = false; initBaseComponents(); - if (isGravitinoServer) { - initGravitinoServerComponents(); - } - LOG.info("Gravitino Environment is initialized."); + LOG.info("Gravitino base environment is initialized."); + } + + /** + * Initialize all components, used for Gravitino server. + * + * @param config The configuration object to initialize the environment. + */ + public void initializeFullComponents(Config config) { + LOG.info("Initializing Gravitino full Environment..."); + this.config = config; + this.manageFullComponents = true; + initBaseComponents(); + initGravitinoServerComponents(); + LOG.info("Gravitino full environment is initialized."); } /** @@ -308,9 +319,11 @@ public FutureGrantManager futureGrantManager() { } public void start() { - auxServiceManager.serviceStart(); metricsSystem.start(); eventListenerManager.start(); + if (manageFullComponents) { + auxServiceManager.serviceStart(); + } } /** Shutdown the Gravitino environment. */ diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java index 0592cfd9421..8ee4f074d4b 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java @@ -28,7 +28,11 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.listener.EventBus; import org.apache.gravitino.metrics.MetricsSystem; import org.apache.gravitino.metrics.source.MetricsSource; import org.apache.gravitino.server.web.HttpServerMetricsSource; @@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) { new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server); metricsSystem.register(httpServerMetricsSource); + EventBus eventBus = GravitinoEnv.getInstance().eventBus(); icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig()); icebergMetricsManager = new IcebergMetricsManager(icebergConfig); + IcebergTableOperationExecutor icebergTableOperationExecutor = + new IcebergTableOperationExecutor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus); config.register( new AbstractBinder() { @Override protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1); } }); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java index 4965f4bc132..613e65dbd30 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java @@ -68,8 +68,8 @@ public Optional getIcebergCatalogConfig(String catalogName) { Preconditions.checkArgument( StringUtils.isNotBlank(catalogName), "blank catalogName is illegal"); Preconditions.checkArgument( - !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName), - IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider"); + !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName), + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + " is illegal in gravitino-based-provider"); Catalog catalog; try { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java index aa7f1032134..13a234c1839 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java @@ -61,7 +61,7 @@ public void initialize(Map properties) { MapUtils.getPrefixMap( properties, String.format("catalog.%s.", catalogName))))); this.catalogConfigs.put( - IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new IcebergConfig(properties)); + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, new IcebergConfig(properties)); } @Override diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java index 622f0d21acd..35fe05ce3a4 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java @@ -45,13 +45,14 @@ public GravitinoIcebergRESTServer(Config config) { } private void initialize() { - gravitinoEnv.initialize(serverConfig, false); + gravitinoEnv.initializeBaseComponents(serverConfig); icebergRESTService.serviceInit( serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX)); ServerAuthenticator.getInstance().initialize(serverConfig); } private void start() { + gravitinoEnv.start(); icebergRESTService.serviceStart(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index 823f42ddb16..a29bea07d94 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -154,16 +154,6 @@ private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider( } } - private String shelling(String rawPrefix) { - if (StringUtils.isBlank(rawPrefix)) { - return rawPrefix; - } else { - // rawPrefix is a string matching ([^/]*/) which end with / - Preconditions.checkArgument( - rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); - return rawPrefix.substring(0, rawPrefix.length() - 1); - } - } private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { try { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java index f880f7f7a9f..ed7d0a2f98d 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java @@ -24,6 +24,7 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; +import org.apache.gravitino.exceptions.IllegalNameIdentifierException; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements ExceptionMapper { ImmutableMap., Integer>builder() .put(IllegalArgumentException.class, 400) .put(ValidationException.class, 400) + .put(IllegalNameIdentifierException.class, 400) .put(NamespaceNotEmptyException.class, 400) .put(NotAuthorizedException.class, 401) + .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403) .put(ForbiddenException.class, 403) .put(NoSuchNamespaceException.class, 404) .put(NoSuchTableException.class, 404) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index fb0e8005c16..5254d612603 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -18,12 +18,21 @@ */ package org.apache.gravitino.iceberg.service; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Arrays; +import java.util.stream.Stream; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.responses.ErrorResponse; public class IcebergRestUtils { @@ -71,4 +80,50 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours) } return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant(); } + + public static String getCatalogName(String rawPrefix) { + String prefix = normalizePrefix(rawPrefix); + Preconditions.checkArgument( + !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(prefix), + String.format("%s is conflict with reserved key, please replace it", prefix)); + if (StringUtils.isBlank(prefix)) { + return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; + } + return prefix; + } + + public static NameIdentifier getGravitinoNameIdentifier( + String catalogName, TableIdentifier icebergIdentifier) { + // todo(fanng): use a more general way to get metalake + Stream catalogNS = + Stream.concat( + Stream.of(IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE, catalogName), + Arrays.stream(icebergIdentifier.namespace().levels())); + String[] catalogNSTable = + Stream.concat(catalogNS, Stream.of(icebergIdentifier.name())).toArray(String[]::new); + return NameIdentifier.of(catalogNSTable); + } + + // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to + // 'iceberg_catalog' + private static String normalizePrefix(String rawPrefix) { + if (StringUtils.isBlank(rawPrefix)) { + return rawPrefix; + } else { + // rawPrefix is a string matching ([^/]*/) which end with / + Preconditions.checkArgument( + rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); + return rawPrefix.substring(0, rawPrefix.length() - 1); + } + } + + public static T cloneIcebergRESTObject(Object message, Class className) { + ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance(); + try { + byte[] values = icebergObjectMapper.writeValueAsBytes(message); + return icebergObjectMapper.readValue(values, className); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java new file mode 100644 index 00000000000..bdea94978ed --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.gravitino.listener.EventBus; +import org.apache.gravitino.listener.api.event.IcebergCreateTableEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent; +import org.apache.gravitino.utils.PrincipalUtils; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationExecutor} that + * not only delegates table operations to the underlying dispatcher but also dispatches + * corresponding events to an {@link org.apache.gravitino.listener.EventBus}. + */ +public class IcebergTableEventDispatcher implements IcebergTableOperationDispatcher { + + private IcebergTableOperationDispatcher icebergTableOperationDispatcher; + private EventBus eventBus; + + public IcebergTableEventDispatcher( + IcebergTableOperationDispatcher icebergTableOperationDispatcher, EventBus eventBus) { + this.icebergTableOperationDispatcher = icebergTableOperationDispatcher; + this.eventBus = eventBus; + } + + @Override + public LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest) { + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, createTableRequest.name()); + NameIdentifier nameIdentifier = + IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier); + eventBus.dispatchEvent( + new IcebergCreateTablePreEvent( + PrincipalUtils.getCurrentUserName(), nameIdentifier, createTableRequest)); + LoadTableResponse loadTableResponse; + try { + loadTableResponse = + icebergTableOperationDispatcher.createTable(catalogName, namespace, createTableRequest); + } catch (Exception e) { + eventBus.dispatchEvent( + new IcebergCreateTableFailureEvent( + PrincipalUtils.getCurrentUserName(), nameIdentifier, e)); + throw e; + } + eventBus.dispatchEvent( + new IcebergCreateTableEvent( + PrincipalUtils.getCurrentUserName(), + nameIdentifier, + createTableRequest, + loadTableResponse)); + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java new file mode 100644 index 00000000000..948e4866217 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg + * tables. + */ +public interface IcebergTableOperationDispatcher { + /** + * Creates a new Iceberg table. + * + * @param catalogName The catalog name when creating the table. + * @param namespace The namespace within which the table should be created. + * @param createTableRequest The request object containing the details for creating the table. + * @return A {@link LoadTableResponse} object containing the result of the operation. + */ + LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest); +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java new file mode 100644 index 00000000000..9a51d7b7a00 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +public class IcebergTableOperationExecutor implements IcebergTableOperationDispatcher { + + private IcebergCatalogWrapperManager icebergCatalogWrapperManager; + + public IcebergTableOperationExecutor(IcebergCatalogWrapperManager icebergCatalogWrapperManager) { + this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; + } + + @Override + public LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest) { + return icebergCatalogWrapperManager + .getCatalogWrapper(catalogName) + .createTable(namespace, createTableRequest); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java index 33023343ef3..ec11b0f870a 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java @@ -50,8 +50,10 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergObjectMapper; import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; import org.apache.gravitino.metrics.MetricNames; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ServiceUnavailableException; import org.apache.iceberg.rest.RESTUtil; @@ -76,6 +78,7 @@ public class IcebergTableOperations { private IcebergMetricsManager icebergMetricsManager; private ObjectMapper icebergObjectMapper; + private IcebergTableOperationDispatcher tableOperationDispatcher; @SuppressWarnings("UnusedVariable") @Context @@ -84,10 +87,12 @@ public class IcebergTableOperations { @Inject public IcebergTableOperations( IcebergCatalogWrapperManager icebergCatalogWrapperManager, - IcebergMetricsManager icebergMetricsManager) { + IcebergMetricsManager icebergMetricsManager, + IcebergTableOperationDispatcher tableOperationDispatcher) { this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; - this.icebergObjectMapper = IcebergObjectMapper.getInstance(); this.icebergMetricsManager = icebergMetricsManager; + this.tableOperationDispatcher = tableOperationDispatcher; + this.icebergObjectMapper = IcebergObjectMapper.getInstance(); } @GET @@ -110,16 +115,17 @@ public Response createTable( CreateTableRequest createTableRequest, @HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation) { boolean isCredentialVending = isCredentialVending(accessDelegation); + String catalogName = IcebergRestUtils.getCatalogName(prefix); + Namespace icebergNS = RESTUtil.decodeNamespace(namespace); LOG.info( - "Create Iceberg table, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", - namespace, + "Create Iceberg table, catalog: {}, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", + catalogName, + icebergNS, createTableRequest, accessDelegation, isCredentialVending); LoadTableResponse loadTableResponse = - icebergCatalogWrapperManager - .getOps(prefix) - .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest); + tableOperationDispatcher.createTable(catalogName, icebergNS, createTableRequest); if (isCredentialVending) { return IcebergRestUtils.ok(injectCredentialConfig(prefix, loadTableResponse)); } else { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java new file mode 100644 index 00000000000..1ce2d8f770e --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** Represent an event after creating Iceberg table successfully. */ +@DeveloperApi +public class IcebergCreateTableEvent extends IcebergTableEvent { + + private CreateTableRequest createTableRequest; + private LoadTableResponse loadTableResponse; + + public IcebergCreateTableEvent( + String user, + NameIdentifier resourceIdentifier, + CreateTableRequest createTableRequest, + LoadTableResponse loadTableResponse) { + super(user, resourceIdentifier); + this.createTableRequest = + IcebergRestUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class); + this.loadTableResponse = + IcebergRestUtils.cloneIcebergRESTObject(loadTableResponse, LoadTableResponse.class); + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } + + public LoadTableResponse loadTableResponse() { + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java new file mode 100644 index 00000000000..24f74da4fdf --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represent a failure event when creating Iceberg table failed. */ +@DeveloperApi +public class IcebergCreateTableFailureEvent extends IcebergTableFailureEvent { + public IcebergCreateTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java new file mode 100644 index 00000000000..81937e501bc --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.iceberg.rest.requests.CreateTableRequest; + +/** Represent a pre event before creating Iceberg table. */ +@DeveloperApi +public class IcebergCreateTablePreEvent extends IcebergTablePreEvent { + private CreateTableRequest createTableRequest; + + public IcebergCreateTablePreEvent( + String user, NameIdentifier resourceIdentifier, CreateTableRequest createTableRequest) { + super(user, resourceIdentifier); + this.createTableRequest = createTableRequest; + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java new file mode 100644 index 00000000000..50ec078637d --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract post event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergEvent extends Event { + protected IcebergEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java new file mode 100644 index 00000000000..e57edde6542 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergPreEvent extends PreEvent { + protected IcebergPreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java new file mode 100644 index 00000000000..e74d34b9e4c --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract failure event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergRESTFailureEvent extends FailureEvent { + protected IcebergRESTFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java new file mode 100644 index 00000000000..e94a1938c1c --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; + +/** Represents an abstract table post event in Gravitino Iceberg REST server. */ +public abstract class IcebergTableEvent extends IcebergEvent { + protected IcebergTableEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java new file mode 100644 index 00000000000..4d7a50a5812 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represent a failure event when do Iceberg table operation failed. */ +@DeveloperApi +public class IcebergTableFailureEvent extends IcebergRESTFailureEvent { + protected IcebergTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java new file mode 100644 index 00000000000..486c6f094a4 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract table pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergTablePreEvent extends IcebergPreEvent { + protected IcebergTablePreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java index f9ffbb42747..9b7e0dba0f1 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java @@ -111,6 +111,6 @@ public void testInvalidIcebergTableOps() { IllegalArgumentException.class, () -> provider.getIcebergCatalogConfig("")); Assertions.assertThrowsExactly( IllegalArgumentException.class, - () -> provider.getIcebergCatalogConfig(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); + () -> provider.getIcebergCatalogConfig(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java index 69f5b5ad257..8384acb13b8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java @@ -38,7 +38,7 @@ public class TestStaticIcebergCatalogWrapperProvider { public void testValidIcebergTableOps() { String hiveCatalogName = "hive_backend"; String jdbcCatalogName = "jdbc_backend"; - String defaultCatalogName = IcebergConstants.GRAVITINO_DEFAULT_CATALOG; + String defaultCatalogName = IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; Map config = Maps.newHashMap(); // hive backend catalog diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java index 328bafa627d..105ce7fd737 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java @@ -51,8 +51,7 @@ public void testValidGetOps(String rawPrefix) { } @ParameterizedTest - @ValueSource( - strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "__gravitino_default_catalog/"}) + @ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "default_catalog/"}) public void testInvalidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java new file mode 100644 index 00000000000..aa0b160572d --- /dev/null +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergRESTUtils { + + @Test + void testGetGravitinoNameIdentifier() { + String catalogName = "catalog"; + TableIdentifier tableIdentifier = TableIdentifier.of("ns1", "ns2", "table"); + NameIdentifier nameIdentifier = + IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier); + Assertions.assertEquals( + NameIdentifier.of( + IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE, catalogName, "ns1", "ns2", "table"), + nameIdentifier); + } + + @Test + void testGetCatalogName() { + String prefix = "catalog/"; + Assertions.assertEquals("catalog", IcebergRestUtils.getCatalogName(prefix)); + Assertions.assertEquals( + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, IcebergRestUtils.getCatalogName("")); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> IcebergRestUtils.getCatalogName(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + "/")); + } + + @Test + void testSerdeIcebergRESTObject() { + Schema tableSchema = + new Schema( + NestedField.of(1, false, "foo1", StringType.get()), + NestedField.of(2, true, "foo2", IntegerType.get())); + CreateTableRequest createTableRequest = + CreateTableRequest.builder().withName("table").withSchema(tableSchema).build(); + CreateTableRequest clonedIcebergRESTObject = + IcebergRestUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class); + Assertions.assertEquals(createTableRequest.name(), clonedIcebergRESTObject.name()); + Assertions.assertEquals( + createTableRequest.schema().columns().size(), + clonedIcebergRESTObject.schema().columns().size()); + for (int i = 0; i < createTableRequest.schema().columns().size(); i++) { + NestedField field = createTableRequest.schema().columns().get(i); + NestedField clonedField = clonedIcebergRESTObject.schema().columns().get(i); + Assertions.assertEquals(field, clonedField); + } + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index 1a085a251d9..1219b888456 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -20,6 +20,7 @@ package org.apache.gravitino.iceberg.service.rest; import com.google.common.collect.Maps; +import java.util.Arrays; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,7 +32,11 @@ import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.logging.LoggingFeature; @@ -89,6 +94,12 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe IcebergCatalogWrapperManager icebergCatalogWrapperManager = new IcebergCatalogWrapperManagerForTest(catalogConf); + EventBus eventBus = new EventBus(Arrays.asList()); + IcebergTableOperationExecutor icebergTableOperationExecutor = + new IcebergTableOperationExecutor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus); + IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register( new AbstractBinder() { @@ -96,6 +107,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(2); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(2); } }); } diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java index e383c65b7a4..36c112f00a2 100644 --- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java @@ -80,7 +80,7 @@ public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) { } public void initialize() { - gravitinoEnv.initialize(serverConfig, true); + gravitinoEnv.initializeFullComponents(serverConfig); JettyServerConfig jettyServerConfig = JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX); From 9a5a18f3eb92632f5254bf36f60785af27301437 Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 16:23:51 +0800 Subject: [PATCH 2/9] fix comment --- .../service/IcebergCatalogWrapperManager.java | 27 ++++-------- .../iceberg/service/IcebergRestUtils.java | 42 +++++++++---------- .../service/rest/IcebergRestTestUtil.java | 2 +- 3 files changed, 31 insertions(+), 40 deletions(-) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index a29bea07d94..d8a3a62da2c 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -22,7 +22,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; @@ -92,17 +91,21 @@ public IcebergCatalogWrapperManager(Map properties) { * @return the instance of IcebergCatalogWrapper. */ public IcebergCatalogWrapper getOps(String rawPrefix) { - String catalogName = getCatalogName(rawPrefix); - IcebergCatalogWrapper tableOps = + String catalogName = IcebergRestUtils.getCatalogName(rawPrefix); + return getCatalogWrapper(catalogName); + } + + public IcebergCatalogWrapper getCatalogWrapper(String catalogName) { + IcebergCatalogWrapper catalogWrapper = icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName)); // Reload conf to reset UserGroupInformation or icebergTableOps will always use // Simple auth. - tableOps.reloadHadoopConf(); - return tableOps; + catalogWrapper.reloadHadoopConf(); + return catalogWrapper; } public CredentialProvider getCredentialProvider(String prefix) { - String catalogName = getCatalogName(prefix); + String catalogName = IcebergRestUtils.getCatalogName(prefix); return credentialProviderManager.getCredentialProvider(catalogName); } @@ -128,17 +131,6 @@ private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { return createIcebergCatalogWrapper(icebergConfig.get()); } - private String getCatalogName(String rawPrefix) { - String prefix = shelling(rawPrefix); - Preconditions.checkArgument( - !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix), - String.format("%s is conflict with reserved key, please replace it", prefix)); - if (StringUtils.isBlank(prefix)) { - return IcebergConstants.GRAVITINO_DEFAULT_CATALOG; - } - return prefix; - } - private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider( Map properties) { String providerName = @@ -154,7 +146,6 @@ private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider( } } - private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { try { catalogWrapper.close(); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index 5254d612603..e2b23b28aa2 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -81,17 +81,6 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours) return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant(); } - public static String getCatalogName(String rawPrefix) { - String prefix = normalizePrefix(rawPrefix); - Preconditions.checkArgument( - !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(prefix), - String.format("%s is conflict with reserved key, please replace it", prefix)); - if (StringUtils.isBlank(prefix)) { - return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; - } - return prefix; - } - public static NameIdentifier getGravitinoNameIdentifier( String catalogName, TableIdentifier icebergIdentifier) { // todo(fanng): use a more general way to get metalake @@ -104,17 +93,15 @@ public static NameIdentifier getGravitinoNameIdentifier( return NameIdentifier.of(catalogNSTable); } - // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to - // 'iceberg_catalog' - private static String normalizePrefix(String rawPrefix) { - if (StringUtils.isBlank(rawPrefix)) { - return rawPrefix; - } else { - // rawPrefix is a string matching ([^/]*/) which end with / - Preconditions.checkArgument( - rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); - return rawPrefix.substring(0, rawPrefix.length() - 1); + public static String getCatalogName(String rawPrefix) { + String catalogName = normalizePrefix(rawPrefix); + Preconditions.checkArgument( + !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName), + String.format("%s is conflict with default catalog name, please replace it", catalogName)); + if (StringUtils.isBlank(catalogName)) { + return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; } + return catalogName; } public static T cloneIcebergRESTObject(Object message, Class className) { @@ -126,4 +113,17 @@ public static T cloneIcebergRESTObject(Object message, Class className) { throw new RuntimeException(e); } } + + // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to + // 'iceberg_catalog' + private static String normalizePrefix(String rawPrefix) { + if (StringUtils.isBlank(rawPrefix)) { + return rawPrefix; + } else { + // rawPrefix is a string matching ([^/]*/) which end with / + Preconditions.checkArgument( + rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); + return rawPrefix.substring(0, rawPrefix.length() - 1); + } + } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index 1219b888456..b6e09db95d8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -31,10 +31,10 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; -import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider; import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; +import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; From f5690e34c9cec3a8aeffdccca8bdf0282d1e08fd Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 16:36:37 +0800 Subject: [PATCH 3/9] fix comment --- core/src/main/java/org/apache/gravitino/GravitinoEnv.java | 2 +- ...{IcebergRESTFailureEvent.java => IcebergFailureEvent.java} | 4 ++-- .../listener/api/event/IcebergTableFailureEvent.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/{IcebergRESTFailureEvent.java => IcebergFailureEvent.java} (87%) diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index 64dcb95d6f7..0a5bd864965 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -150,7 +150,7 @@ public void initializeBaseComponents(Config config) { * @param config The configuration object to initialize the environment. */ public void initializeFullComponents(Config config) { - LOG.info("Initializing Gravitino full Environment..."); + LOG.info("Initializing Gravitino full environment..."); this.config = config; this.manageFullComponents = true; initBaseComponents(); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java similarity index 87% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java index e74d34b9e4c..e166bfdc919 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java @@ -24,8 +24,8 @@ /** Represents an abstract failure event in Gravitino Iceberg REST server. */ @DeveloperApi -public abstract class IcebergRESTFailureEvent extends FailureEvent { - protected IcebergRESTFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { +public abstract class IcebergFailureEvent extends FailureEvent { + protected IcebergFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { super(user, nameIdentifier, e); } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java index 4d7a50a5812..f052b0060e4 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java @@ -24,7 +24,7 @@ /** Represent a failure event when do Iceberg table operation failed. */ @DeveloperApi -public class IcebergTableFailureEvent extends IcebergRESTFailureEvent { +public class IcebergTableFailureEvent extends IcebergFailureEvent { protected IcebergTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { super(user, nameIdentifier, e); } From fe508a6f4f1f672e1d65de32a54fce0849fe03af Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 17:30:13 +0800 Subject: [PATCH 4/9] fix comment --- .../iceberg/common/IcebergConfig.java | 2 +- .../ops/IcebergCatalogConfigProvider.java | 42 -------------- .../apache/gravitino/iceberg/RESTService.java | 17 +++++- .../service/IcebergCatalogWrapperManager.java | 41 ++----------- .../iceberg/service/IcebergRestUtils.java | 5 +- .../IcebergTableEventDispatcher.java | 8 ++- .../DynamicIcebergCatalogConfigProvider.java | 46 +++++++-------- .../provider/IcebergConfigProvider.java | 57 +++++++++++++++++++ .../StaticIcebergCatalogConfigProvider.java | 36 ++++++------ .../TestIcebergCatalogWrapperManager.java | 8 ++- .../iceberg/service/TestIcebergRESTUtils.java | 7 +-- .../TestDynamicIcebergConfigProvider.java} | 32 +++++------ .../TestStaticIcebergConfigProvider.java} | 32 +++++------ .../IcebergCatalogWrapperManagerForTest.java | 6 +- .../service/rest/IcebergRestTestUtil.java | 12 +++- 15 files changed, 181 insertions(+), 170 deletions(-) delete mode 100644 iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java rename iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/{ => service}/provider/DynamicIcebergCatalogConfigProvider.java (67%) create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java rename iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/{ => service}/provider/StaticIcebergCatalogConfigProvider.java (71%) rename iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/{provider/TestDynamicIcebergCatalogWrapperProvider.java => service/provider/TestDynamicIcebergConfigProvider.java} (82%) rename iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/{provider/TestStaticIcebergCatalogWrapperProvider.java => service/provider/TestStaticIcebergConfigProvider.java} (84%) diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 638b4172ce4..281ad385eb1 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -205,7 +205,7 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_REST_CATALOG_CONFIG_PROVIDER = new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER) .doc( - "Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") + "Catalog provider class name, you can develop a class that implements `IcebergConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME); diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java deleted file mode 100644 index fc0d488a11d..00000000000 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.iceberg.common.ops; - -import java.util.Map; -import java.util.Optional; -import org.apache.gravitino.iceberg.common.IcebergConfig; - -/** - * {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server - * gets Iceberg catalog configurations. - */ -public interface IcebergCatalogConfigProvider { - - /** - * @param properties The parameters for creating Provider which from configurations whose prefix - * is 'gravitino.iceberg-rest.' - */ - void initialize(Map properties); - - /** - * @param catalogName Iceberg catalog name. - * @return the configuration of Iceberg catalog. - */ - Optional getIcebergCatalogConfig(String catalogName); -} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java index 8ee4f074d4b..036ba413630 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java @@ -32,6 +32,8 @@ import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; import org.apache.gravitino.listener.EventBus; import org.apache.gravitino.metrics.MetricsSystem; import org.apache.gravitino.metrics.source.MetricsSource; @@ -58,6 +60,7 @@ public class RESTService implements GravitinoAuxiliaryService { private IcebergCatalogWrapperManager icebergCatalogWrapperManager; private IcebergMetricsManager icebergMetricsManager; + private IcebergConfigProvider configProvider; private void initServer(IcebergConfig icebergConfig) { JettyServerConfig serverConfig = JettyServerConfig.fromConfig(icebergConfig); @@ -74,13 +77,18 @@ private void initServer(IcebergConfig icebergConfig) { new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server); metricsSystem.register(httpServerMetricsSource); + Map configProperties = icebergConfig.getAllConfig(); EventBus eventBus = GravitinoEnv.getInstance().eventBus(); - icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig()); - icebergMetricsManager = new IcebergMetricsManager(icebergConfig); + this.configProvider = IcebergConfigProviderFactory.create(configProperties); + configProvider.initialize(configProperties); + String metalakeName = configProvider.getMetalakeName(); + this.icebergCatalogWrapperManager = + new IcebergCatalogWrapperManager(configProperties, configProvider); + this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig); IcebergTableOperationExecutor icebergTableOperationExecutor = new IcebergTableOperationExecutor(icebergCatalogWrapperManager); IcebergTableEventDispatcher icebergTableEventDispatcher = - new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus); + new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName); config.register( new AbstractBinder() { @Override @@ -128,6 +136,9 @@ public void serviceStop() throws Exception { server.stop(); LOG.info("Iceberg REST service stopped"); } + if (configProvider != null) { + configProvider.close(); + } if (icebergCatalogWrapperManager != null) { icebergCatalogWrapperManager.close(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index d8a3a62da2c..cefc62bc268 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -22,45 +22,34 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.credential.CredentialProvider; import org.apache.gravitino.credential.CredentialProviderFactory; import org.apache.gravitino.credential.CredentialProviderManager; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.provider.DynamicIcebergCatalogConfigProvider; -import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class IcebergCatalogWrapperManager implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class); - private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = - ImmutableMap.of( - IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, - StaticIcebergCatalogConfigProvider.class.getCanonicalName(), - IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, - DynamicIcebergCatalogConfigProvider.class.getCanonicalName()); - private final Cache icebergCatalogWrapperCache; - private final IcebergCatalogConfigProvider provider; + private final IcebergConfigProvider configProvider; private CredentialProviderManager credentialProviderManager; - public IcebergCatalogWrapperManager(Map properties) { + public IcebergCatalogWrapperManager( + Map properties, IcebergConfigProvider configProvider) { this.credentialProviderManager = new CredentialProviderManager(); - this.provider = createIcebergCatalogConfigProvider(properties); - this.provider.initialize(properties); + this.configProvider = configProvider; this.icebergCatalogWrapperCache = Caffeine.newBuilder() .expireAfterWrite( @@ -115,7 +104,7 @@ protected IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig iceber } private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { - Optional icebergConfig = provider.getIcebergCatalogConfig(catalogName); + Optional icebergConfig = configProvider.getIcebergCatalogConfig(catalogName); if (!icebergConfig.isPresent()) { throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName); } @@ -131,21 +120,6 @@ private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { return createIcebergCatalogWrapper(icebergConfig.get()); } - private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider( - Map properties) { - String providerName = - (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER); - String className = - ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName); - LOG.info("Load Iceberg catalog provider: {}.", className); - try { - Class providerClz = Class.forName(className); - return (IcebergCatalogConfigProvider) providerClz.getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { try { catalogWrapper.close(); @@ -157,8 +131,5 @@ private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { @Override public void close() throws Exception { icebergCatalogWrapperCache.invalidateAll(); - if (provider instanceof AutoCloseable) { - ((AutoCloseable) provider).close(); - } } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index e2b23b28aa2..56c8684e3e5 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -82,11 +82,10 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours) } public static NameIdentifier getGravitinoNameIdentifier( - String catalogName, TableIdentifier icebergIdentifier) { - // todo(fanng): use a more general way to get metalake + String metalakeName, String catalogName, TableIdentifier icebergIdentifier) { Stream catalogNS = Stream.concat( - Stream.of(IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE, catalogName), + Stream.of(metalakeName, catalogName), Arrays.stream(icebergIdentifier.namespace().levels())); String[] catalogNSTable = Stream.concat(catalogNS, Stream.of(icebergIdentifier.name())).toArray(String[]::new); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java index bdea94978ed..315ba620378 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java @@ -40,11 +40,15 @@ public class IcebergTableEventDispatcher implements IcebergTableOperationDispatc private IcebergTableOperationDispatcher icebergTableOperationDispatcher; private EventBus eventBus; + private String metalakeName; public IcebergTableEventDispatcher( - IcebergTableOperationDispatcher icebergTableOperationDispatcher, EventBus eventBus) { + IcebergTableOperationDispatcher icebergTableOperationDispatcher, + EventBus eventBus, + String metalakeName) { this.icebergTableOperationDispatcher = icebergTableOperationDispatcher; this.eventBus = eventBus; + this.metalakeName = metalakeName; } @Override @@ -52,7 +56,7 @@ public LoadTableResponse createTable( String catalogName, Namespace namespace, CreateTableRequest createTableRequest) { TableIdentifier tableIdentifier = TableIdentifier.of(namespace, createTableRequest.name()); NameIdentifier nameIdentifier = - IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier); + IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier); eventBus.dispatchEvent( new IcebergCreateTablePreEvent( PrincipalUtils.getCurrentUserName(), nameIdentifier, createTableRequest)); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java similarity index 67% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java index 613e65dbd30..bd8e33a420c 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -29,9 +29,6 @@ import org.apache.gravitino.client.GravitinoAdminClient; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This provider proxy Gravitino lakehouse-iceberg catalogs. @@ -40,11 +37,7 @@ * *

The catalogName is iceberg_catalog */ -public class DynamicIcebergCatalogConfigProvider - implements IcebergCatalogConfigProvider, AutoCloseable { - public static final Logger LOG = - LoggerFactory.getLogger(DynamicIcebergCatalogConfigProvider.class); - +public class DynamicIcebergCatalogConfigProvider implements IcebergConfigProvider { private String gravitinoMetalake; private GravitinoAdminClient client; @@ -93,9 +86,14 @@ void setClient(GravitinoAdminClient client) { } @Override - public void close() throws Exception { + public void close() { if (client != null) { client.close(); } } + + @Override + public String getMetalakeName() { + return gravitinoMetalake; + } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java new file mode 100644 index 00000000000..55d9793fba4 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.provider; + +import java.io.Closeable; +import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.iceberg.common.IcebergConfig; + +/** + * {@code IcebergConfigProvider} is an interface defining how Iceberg REST catalog server gets + * Iceberg catalog configurations. + */ +public interface IcebergConfigProvider extends Closeable { + + /** + * Initialize {@code IcebergConfigProvider} with properties. + * + * @param properties The parameters for creating Provider which from configurations whose prefix + * is 'gravitino.iceberg-rest.' + */ + void initialize(Map properties); + + /** + * Get Iceberg configuration from catalog name. + * + * @param catalogName Iceberg catalog name. + * @return the configuration of Iceberg catalog. + */ + Optional getIcebergCatalogConfig(String catalogName); + + /** + * Get metalake name. + * + * @return the name of metalake. + */ + default String getMetalakeName() { + return IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java similarity index 71% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java index 13a234c1839..6c94b75cc65 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.annotations.VisibleForTesting; import java.util.Map; @@ -24,7 +24,6 @@ import java.util.stream.Collectors; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.apache.gravitino.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +38,7 @@ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ... */ -public class StaticIcebergCatalogConfigProvider implements IcebergCatalogConfigProvider { +public class StaticIcebergCatalogConfigProvider implements IcebergConfigProvider { public static final Logger LOG = LoggerFactory.getLogger(StaticIcebergCatalogConfigProvider.class); @@ -69,6 +68,9 @@ public Optional getIcebergCatalogConfig(String catalogName) { return Optional.ofNullable(catalogConfigs.get(catalogName)); } + @Override + public void close() {} + private Optional getCatalogName(String catalogConfigKey) { if (!catalogConfigKey.startsWith("catalog.")) { return Optional.empty(); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java index 105ce7fd737..472e3c5a9e8 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -39,7 +41,8 @@ public void testValidGetOps(String rawPrefix) { } Map config = Maps.newHashMap(); config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix); - IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); IcebergCatalogWrapper ops = manager.getOps(rawPrefix); @@ -54,7 +57,8 @@ public void testValidGetOps(String rawPrefix) { @ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "default_catalog/"}) public void testInvalidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); - IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> manager.getOps(rawPrefix)); } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java index aa0b160572d..79a317e52bd 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java @@ -34,14 +34,13 @@ public class TestIcebergRESTUtils { @Test void testGetGravitinoNameIdentifier() { + String metalakeName = "metalake"; String catalogName = "catalog"; TableIdentifier tableIdentifier = TableIdentifier.of("ns1", "ns2", "table"); NameIdentifier nameIdentifier = - IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier); + IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier); Assertions.assertEquals( - NameIdentifier.of( - IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE, catalogName, "ns1", "ns2", "table"), - nameIdentifier); + NameIdentifier.of(metalakeName, catalogName, "ns1", "ns2", "table"), nameIdentifier); } @Test diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java similarity index 82% rename from iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java index 9b7e0dba0f1..241c41bbdf3 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import java.util.HashMap; import org.apache.gravitino.Catalog; @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class TestDynamicIcebergCatalogWrapperProvider { +public class TestDynamicIcebergConfigProvider { @Test public void testValidIcebergTableOps() { String hiveCatalogName = "hive_backend"; diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java similarity index 84% rename from iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java index 8384acb13b8..7b4fffb1fc4 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.collect.Maps; import java.util.Map; @@ -32,7 +32,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class TestStaticIcebergCatalogWrapperProvider { +public class TestStaticIcebergConfigProvider { @Test public void testValidIcebergTableOps() { diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java index 7d359926a85..361b086d987 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java @@ -23,11 +23,13 @@ import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; // Provide a custom catalogWrapper to do test like `registerTable` public class IcebergCatalogWrapperManagerForTest extends IcebergCatalogWrapperManager { - public IcebergCatalogWrapperManagerForTest(Map properties) { - super(properties); + public IcebergCatalogWrapperManagerForTest( + Map properties, IcebergConfigProvider configProvider) { + super(properties, configProvider); } @Override diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index b6e09db95d8..43b0223f012 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -27,7 +27,6 @@ import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.credential.CredentialConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider; import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; @@ -36,6 +35,9 @@ import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; +import org.apache.gravitino.iceberg.service.provider.StaticIcebergCatalogConfigProvider; import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.jackson.JacksonFeature; @@ -90,15 +92,19 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe catalogConf.put( CredentialConstants.CREDENTIAL_PROVIDER_TYPE, DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(catalogConf); + configProvider.initialize(catalogConf); // used to override register table interface IcebergCatalogWrapperManager icebergCatalogWrapperManager = - new IcebergCatalogWrapperManagerForTest(catalogConf); + new IcebergCatalogWrapperManagerForTest(catalogConf, configProvider); EventBus eventBus = new EventBus(Arrays.asList()); + IcebergTableOperationExecutor icebergTableOperationExecutor = new IcebergTableOperationExecutor(icebergCatalogWrapperManager); IcebergTableEventDispatcher icebergTableEventDispatcher = - new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus); + new IcebergTableEventDispatcher( + icebergTableOperationExecutor, eventBus, configProvider.getMetalakeName()); IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register( From 641f9e1ff167f2c4a8ecac0bd77e066487269621 Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 17:33:27 +0800 Subject: [PATCH 5/9] fix comment --- .../org/apache/gravitino/iceberg/service/IcebergRestUtils.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index 56c8684e3e5..af017b2de96 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -96,7 +96,8 @@ public static String getCatalogName(String rawPrefix) { String catalogName = normalizePrefix(rawPrefix); Preconditions.checkArgument( !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName), - String.format("%s is conflict with default catalog name, please replace it", catalogName)); + String.format( + "%s is conflicted with reserved catalog name, please replace it", catalogName)); if (StringUtils.isBlank(catalogName)) { return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; } From fedb8dd47b425e3cd2b311261a6c75acb53e6af7 Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 17:39:54 +0800 Subject: [PATCH 6/9] fix comment --- ...ConfigProvider.java => DynamicIcebergConfigProvider.java} | 2 +- ...gConfigProvider.java => StaticIcebergConfigProvider.java} | 5 ++--- .../service/provider/TestDynamicIcebergConfigProvider.java | 4 ++-- .../service/provider/TestStaticIcebergConfigProvider.java | 4 ++-- .../gravitino/iceberg/service/rest/IcebergRestTestUtil.java | 4 ++-- 5 files changed, 9 insertions(+), 10 deletions(-) rename iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/{DynamicIcebergCatalogConfigProvider.java => DynamicIcebergConfigProvider.java} (97%) rename iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/{StaticIcebergCatalogConfigProvider.java => StaticIcebergConfigProvider.java} (94%) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java similarity index 97% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java index bd8e33a420c..0f35fae529a 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java @@ -37,7 +37,7 @@ * *

The catalogName is iceberg_catalog */ -public class DynamicIcebergCatalogConfigProvider implements IcebergConfigProvider { +public class DynamicIcebergConfigProvider implements IcebergConfigProvider { private String gravitinoMetalake; private GravitinoAdminClient client; diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java similarity index 94% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java index 6c94b75cc65..f8c4fb3505e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java @@ -38,9 +38,8 @@ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ... */ -public class StaticIcebergCatalogConfigProvider implements IcebergConfigProvider { - public static final Logger LOG = - LoggerFactory.getLogger(StaticIcebergCatalogConfigProvider.class); +public class StaticIcebergConfigProvider implements IcebergConfigProvider { + public static final Logger LOG = LoggerFactory.getLogger(StaticIcebergConfigProvider.class); @VisibleForTesting Map catalogConfigs; diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java index 241c41bbdf3..4eb5da5afce 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java @@ -71,7 +71,7 @@ public void testValidIcebergTableOps() { } }); - DynamicIcebergCatalogConfigProvider provider = new DynamicIcebergCatalogConfigProvider(); + DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider(); GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); provider.setClient(client); @@ -102,7 +102,7 @@ public void testInvalidIcebergTableOps() { GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); - DynamicIcebergCatalogConfigProvider provider = new DynamicIcebergCatalogConfigProvider(); + DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider(); provider.setClient(client); Assertions.assertThrowsExactly( diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java index 7b4fffb1fc4..3a4766016e0 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java @@ -60,7 +60,7 @@ public void testValidIcebergTableOps() { config.put("catalog-backend", "memory"); config.put("warehouse", "/tmp/"); - StaticIcebergCatalogConfigProvider provider = new StaticIcebergCatalogConfigProvider(); + StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider(); provider.initialize(config); IcebergConfig hiveIcebergConfig = provider.catalogConfigs.get(hiveCatalogName); @@ -106,7 +106,7 @@ public void testValidIcebergTableOps() { @ParameterizedTest @ValueSource(strings = {"", "not_match"}) public void testInvalidIcebergTableOps(String catalogName) { - StaticIcebergCatalogConfigProvider provider = new StaticIcebergCatalogConfigProvider(); + StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider(); provider.initialize(Maps.newHashMap()); Optional config = provider.getIcebergCatalogConfig(catalogName); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index 43b0223f012..1314a3bac44 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -37,7 +37,7 @@ import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; -import org.apache.gravitino.iceberg.service.provider.StaticIcebergCatalogConfigProvider; +import org.apache.gravitino.iceberg.service.provider.StaticIcebergConfigProvider; import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.jackson.JacksonFeature; @@ -87,7 +87,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe String catalogConfigPrefix = "catalog." + PREFIX; catalogConf.put( IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER, - StaticIcebergCatalogConfigProvider.class.getName()); + StaticIcebergConfigProvider.class.getName()); catalogConf.put(String.format("%s.catalog-backend-name", catalogConfigPrefix), PREFIX); catalogConf.put( CredentialConstants.CREDENTIAL_PROVIDER_TYPE, From c11ad89a9e3a419d1ce8656319ab82a970ed52bb Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 17:58:52 +0800 Subject: [PATCH 7/9] fix comment --- .../IcebergConfigProviderFactory.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java new file mode 100644 index 00000000000..b2b258cb14e --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.provider; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.iceberg.common.IcebergConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergConfigProviderFactory { + public static final Logger LOG = LoggerFactory.getLogger(IcebergConfigProviderFactory.class); + private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = + ImmutableMap.of( + IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + StaticIcebergConfigProvider.class.getCanonicalName(), + IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + DynamicIcebergConfigProvider.class.getCanonicalName()); + + public static IcebergConfigProvider create(Map properties) { + String providerName = + (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER); + String className = + ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName); + LOG.info("Load Iceberg catalog provider: {}.", className); + try { + Class providerClz = Class.forName(className); + return (IcebergConfigProvider) providerClz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} From 12baeeb769d864a3ed3b9977668133392ab66b2b Mon Sep 17 00:00:00 2001 From: fanng Date: Sun, 20 Oct 2024 18:20:52 +0800 Subject: [PATCH 8/9] fix comment --- .../iceberg/service/TestIcebergCatalogWrapperManager.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java index 472e3c5a9e8..85a7fdc04e9 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java @@ -42,6 +42,7 @@ public void testValidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix); IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + configProvider.initialize(config); IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); IcebergCatalogWrapper ops = manager.getOps(rawPrefix); @@ -58,6 +59,7 @@ public void testValidGetOps(String rawPrefix) { public void testInvalidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + configProvider.initialize(config); IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> manager.getOps(rawPrefix)); From 9a3e988d4475fff51c4d0a40f0532ca678be8936 Mon Sep 17 00:00:00 2001 From: fanng Date: Mon, 21 Oct 2024 19:12:04 +0800 Subject: [PATCH 9/9] fix comment --- .../gravitino/iceberg/common/IcebergConfig.java | 16 +++++++++++----- .../apache/gravitino/iceberg/RESTService.java | 4 +++- .../provider/IcebergConfigProviderFactory.java | 1 + .../service/rest/IcebergTableOperations.java | 3 ++- 4 files changed, 17 insertions(+), 7 deletions(-) diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 281ad385eb1..2e7eb74e2f1 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -54,7 +54,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry CATALOG_BACKEND_IMPL = new ConfigBuilder(IcebergConstants.CATALOG_BACKEND_IMPL) .doc( - "The fully-qualified class name of a custom catalog implementation, only worked if `catalog-backend` is `custom`") + "The fully-qualified class name of a custom catalog implementation, " + + "only worked if `catalog-backend` is `custom`") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); @@ -175,7 +176,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_METRICS_STORE_RETAIN_DAYS = new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE_RETAIN_DAYS) .doc( - "The retain days of Iceberg metrics, the value not greater than 0 means retain forever") + "The retain days of Iceberg metrics, the value not greater than 0 means " + + "retain forever") .version(ConfigConstants.VERSION_0_4_0) .intConf() .createWithDefault(-1); @@ -205,7 +207,9 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_REST_CATALOG_CONFIG_PROVIDER = new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER) .doc( - "Catalog provider class name, you can develop a class that implements `IcebergConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") + "Catalog provider class name, you can develop a class that implements " + + "`IcebergConfigProvider` and add the corresponding jar file to the Iceberg " + + "REST service classpath directory.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME); @@ -213,7 +217,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry GRAVITINO_URI = new ConfigBuilder(IcebergConstants.GRAVITINO_URI) .doc( - "The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`.") + "The uri of Gravitino server address, only worked if `catalog-provider` is " + + "`gravitino-based-provider`.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); @@ -221,7 +226,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry GRAVITINO_METALAKE = new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE) .doc( - "The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`.") + "The metalake name that `gravitino-based-provider` used to request to Gravitino, " + + "only worked if `catalog-provider` is `gravitino-based-provider`.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java index 036ba413630..b301204bd20 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java @@ -78,10 +78,11 @@ private void initServer(IcebergConfig icebergConfig) { metricsSystem.register(httpServerMetricsSource); Map configProperties = icebergConfig.getAllConfig(); - EventBus eventBus = GravitinoEnv.getInstance().eventBus(); this.configProvider = IcebergConfigProviderFactory.create(configProperties); configProvider.initialize(configProperties); String metalakeName = configProvider.getMetalakeName(); + + EventBus eventBus = GravitinoEnv.getInstance().eventBus(); this.icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(configProperties, configProvider); this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig); @@ -89,6 +90,7 @@ private void initServer(IcebergConfig icebergConfig) { new IcebergTableOperationExecutor(icebergCatalogWrapperManager); IcebergTableEventDispatcher icebergTableEventDispatcher = new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName); + config.register( new AbstractBinder() { @Override diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java index b2b258cb14e..f217590314e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java @@ -28,6 +28,7 @@ public class IcebergConfigProviderFactory { public static final Logger LOG = LoggerFactory.getLogger(IcebergConfigProviderFactory.class); + private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = ImmutableMap.of( IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java index ec11b0f870a..cebb748845e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java @@ -118,7 +118,8 @@ public Response createTable( String catalogName = IcebergRestUtils.getCatalogName(prefix); Namespace icebergNS = RESTUtil.decodeNamespace(namespace); LOG.info( - "Create Iceberg table, catalog: {}, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", + "Create Iceberg table, catalog: {}, namespace: {}, create table request: {}, " + + "accessDelegation: {}, isCredentialVending: {}", catalogName, icebergNS, createTableRequest,