diff --git a/bundles/build.gradle.kts b/bundles/build.gradle.kts index 043fbfec673..fa6eb7d5ef9 100644 --- a/bundles/build.gradle.kts +++ b/bundles/build.gradle.kts @@ -19,4 +19,4 @@ tasks.all { enabled = false -} \ No newline at end of file +} diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java index 004bde0bd7e..52e665579da 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java @@ -77,5 +77,6 @@ public class IcebergConstants { public static final String GRAVITINO_METALAKE = "gravitino-metalake"; - public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog"; + public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino"; + public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog"; } diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index 77d76b47476..0a5bd864965 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -78,6 +78,8 @@ public class GravitinoEnv { private static final Logger LOG = LoggerFactory.getLogger(GravitinoEnv.class); private Config config; + // Iceberg REST server use base components while Gravitino Server use full components. + private boolean manageFullComponents = true; private EntityStore entityStore; @@ -130,21 +132,30 @@ public static GravitinoEnv getInstance() { } /** - * Initialize the Gravitino environment. + * Initialize base components, used for Iceberg REST server. * * @param config The configuration object to initialize the environment. - * @param isGravitinoServer A boolean flag indicating whether the initialization is for the - * Gravitino server. If true, server-specific components will be initialized in addition to - * the base components. */ - public void initialize(Config config, boolean isGravitinoServer) { - LOG.info("Initializing Gravitino Environment..."); + public void initializeBaseComponents(Config config) { + LOG.info("Initializing Gravitino base environment..."); this.config = config; + this.manageFullComponents = false; initBaseComponents(); - if (isGravitinoServer) { - initGravitinoServerComponents(); - } - LOG.info("Gravitino Environment is initialized."); + LOG.info("Gravitino base environment is initialized."); + } + + /** + * Initialize all components, used for Gravitino server. + * + * @param config The configuration object to initialize the environment. + */ + public void initializeFullComponents(Config config) { + LOG.info("Initializing Gravitino full environment..."); + this.config = config; + this.manageFullComponents = true; + initBaseComponents(); + initGravitinoServerComponents(); + LOG.info("Gravitino full environment is initialized."); } /** @@ -308,9 +319,11 @@ public FutureGrantManager futureGrantManager() { } public void start() { - auxServiceManager.serviceStart(); metricsSystem.start(); eventListenerManager.start(); + if (manageFullComponents) { + auxServiceManager.serviceStart(); + } } /** Shutdown the Gravitino environment. */ diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java index 638b4172ce4..2e7eb74e2f1 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java @@ -54,7 +54,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry CATALOG_BACKEND_IMPL = new ConfigBuilder(IcebergConstants.CATALOG_BACKEND_IMPL) .doc( - "The fully-qualified class name of a custom catalog implementation, only worked if `catalog-backend` is `custom`") + "The fully-qualified class name of a custom catalog implementation, " + + "only worked if `catalog-backend` is `custom`") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); @@ -175,7 +176,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_METRICS_STORE_RETAIN_DAYS = new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE_RETAIN_DAYS) .doc( - "The retain days of Iceberg metrics, the value not greater than 0 means retain forever") + "The retain days of Iceberg metrics, the value not greater than 0 means " + + "retain forever") .version(ConfigConstants.VERSION_0_4_0) .intConf() .createWithDefault(-1); @@ -205,7 +207,9 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry ICEBERG_REST_CATALOG_CONFIG_PROVIDER = new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER) .doc( - "Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") + "Catalog provider class name, you can develop a class that implements " + + "`IcebergConfigProvider` and add the corresponding jar file to the Iceberg " + + "REST service classpath directory.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME); @@ -213,7 +217,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry GRAVITINO_URI = new ConfigBuilder(IcebergConstants.GRAVITINO_URI) .doc( - "The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`.") + "The uri of Gravitino server address, only worked if `catalog-provider` is " + + "`gravitino-based-provider`.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); @@ -221,7 +226,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { public static final ConfigEntry GRAVITINO_METALAKE = new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE) .doc( - "The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`.") + "The metalake name that `gravitino-based-provider` used to request to Gravitino, " + + "only worked if `catalog-provider` is `gravitino-based-provider`.") .version(ConfigConstants.VERSION_0_7_0) .stringConf() .create(); diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java deleted file mode 100644 index fc0d488a11d..00000000000 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogConfigProvider.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.gravitino.iceberg.common.ops; - -import java.util.Map; -import java.util.Optional; -import org.apache.gravitino.iceberg.common.IcebergConfig; - -/** - * {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server - * gets Iceberg catalog configurations. - */ -public interface IcebergCatalogConfigProvider { - - /** - * @param properties The parameters for creating Provider which from configurations whose prefix - * is 'gravitino.iceberg-rest.' - */ - void initialize(Map properties); - - /** - * @param catalogName Iceberg catalog name. - * @return the configuration of Iceberg catalog. - */ - Optional getIcebergCatalogConfig(String catalogName); -} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java index 0592cfd9421..b301204bd20 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java @@ -28,7 +28,13 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; +import org.apache.gravitino.listener.EventBus; import org.apache.gravitino.metrics.MetricsSystem; import org.apache.gravitino.metrics.source.MetricsSource; import org.apache.gravitino.server.web.HttpServerMetricsSource; @@ -54,6 +60,7 @@ public class RESTService implements GravitinoAuxiliaryService { private IcebergCatalogWrapperManager icebergCatalogWrapperManager; private IcebergMetricsManager icebergMetricsManager; + private IcebergConfigProvider configProvider; private void initServer(IcebergConfig icebergConfig) { JettyServerConfig serverConfig = JettyServerConfig.fromConfig(icebergConfig); @@ -70,14 +77,27 @@ private void initServer(IcebergConfig icebergConfig) { new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server); metricsSystem.register(httpServerMetricsSource); - icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig()); - icebergMetricsManager = new IcebergMetricsManager(icebergConfig); + Map configProperties = icebergConfig.getAllConfig(); + this.configProvider = IcebergConfigProviderFactory.create(configProperties); + configProvider.initialize(configProperties); + String metalakeName = configProvider.getMetalakeName(); + + EventBus eventBus = GravitinoEnv.getInstance().eventBus(); + this.icebergCatalogWrapperManager = + new IcebergCatalogWrapperManager(configProperties, configProvider); + this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig); + IcebergTableOperationExecutor icebergTableOperationExecutor = + new IcebergTableOperationExecutor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName); + config.register( new AbstractBinder() { @Override protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1); } }); @@ -118,6 +138,9 @@ public void serviceStop() throws Exception { server.stop(); LOG.info("Iceberg REST service stopped"); } + if (configProvider != null) { + configProvider.close(); + } if (icebergCatalogWrapperManager != null) { icebergCatalogWrapperManager.close(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java index 622f0d21acd..35fe05ce3a4 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java @@ -45,13 +45,14 @@ public GravitinoIcebergRESTServer(Config config) { } private void initialize() { - gravitinoEnv.initialize(serverConfig, false); + gravitinoEnv.initializeBaseComponents(serverConfig); icebergRESTService.serviceInit( serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX)); ServerAuthenticator.getInstance().initialize(serverConfig); } private void start() { + gravitinoEnv.start(); icebergRESTService.serviceStart(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index 823f42ddb16..cefc62bc268 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -22,46 +22,34 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.Optional; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; -import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.credential.CredentialProvider; import org.apache.gravitino.credential.CredentialProviderFactory; import org.apache.gravitino.credential.CredentialProviderManager; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; -import org.apache.gravitino.iceberg.provider.DynamicIcebergCatalogConfigProvider; -import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class IcebergCatalogWrapperManager implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class); - private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = - ImmutableMap.of( - IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, - StaticIcebergCatalogConfigProvider.class.getCanonicalName(), - IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, - DynamicIcebergCatalogConfigProvider.class.getCanonicalName()); - private final Cache icebergCatalogWrapperCache; - private final IcebergCatalogConfigProvider provider; + private final IcebergConfigProvider configProvider; private CredentialProviderManager credentialProviderManager; - public IcebergCatalogWrapperManager(Map properties) { + public IcebergCatalogWrapperManager( + Map properties, IcebergConfigProvider configProvider) { this.credentialProviderManager = new CredentialProviderManager(); - this.provider = createIcebergCatalogConfigProvider(properties); - this.provider.initialize(properties); + this.configProvider = configProvider; this.icebergCatalogWrapperCache = Caffeine.newBuilder() .expireAfterWrite( @@ -92,17 +80,21 @@ public IcebergCatalogWrapperManager(Map properties) { * @return the instance of IcebergCatalogWrapper. */ public IcebergCatalogWrapper getOps(String rawPrefix) { - String catalogName = getCatalogName(rawPrefix); - IcebergCatalogWrapper tableOps = + String catalogName = IcebergRestUtils.getCatalogName(rawPrefix); + return getCatalogWrapper(catalogName); + } + + public IcebergCatalogWrapper getCatalogWrapper(String catalogName) { + IcebergCatalogWrapper catalogWrapper = icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName)); // Reload conf to reset UserGroupInformation or icebergTableOps will always use // Simple auth. - tableOps.reloadHadoopConf(); - return tableOps; + catalogWrapper.reloadHadoopConf(); + return catalogWrapper; } public CredentialProvider getCredentialProvider(String prefix) { - String catalogName = getCatalogName(prefix); + String catalogName = IcebergRestUtils.getCatalogName(prefix); return credentialProviderManager.getCredentialProvider(catalogName); } @@ -112,7 +104,7 @@ protected IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig iceber } private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { - Optional icebergConfig = provider.getIcebergCatalogConfig(catalogName); + Optional icebergConfig = configProvider.getIcebergCatalogConfig(catalogName); if (!icebergConfig.isPresent()) { throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName); } @@ -128,43 +120,6 @@ private IcebergCatalogWrapper createCatalogWrapper(String catalogName) { return createIcebergCatalogWrapper(icebergConfig.get()); } - private String getCatalogName(String rawPrefix) { - String prefix = shelling(rawPrefix); - Preconditions.checkArgument( - !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix), - String.format("%s is conflict with reserved key, please replace it", prefix)); - if (StringUtils.isBlank(prefix)) { - return IcebergConstants.GRAVITINO_DEFAULT_CATALOG; - } - return prefix; - } - - private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider( - Map properties) { - String providerName = - (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER); - String className = - ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName); - LOG.info("Load Iceberg catalog provider: {}.", className); - try { - Class providerClz = Class.forName(className); - return (IcebergCatalogConfigProvider) providerClz.getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private String shelling(String rawPrefix) { - if (StringUtils.isBlank(rawPrefix)) { - return rawPrefix; - } else { - // rawPrefix is a string matching ([^/]*/) which end with / - Preconditions.checkArgument( - rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); - return rawPrefix.substring(0, rawPrefix.length() - 1); - } - } - private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { try { catalogWrapper.close(); @@ -176,8 +131,5 @@ private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) { @Override public void close() throws Exception { icebergCatalogWrapperCache.invalidateAll(); - if (provider instanceof AutoCloseable) { - ((AutoCloseable) provider).close(); - } } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java index f880f7f7a9f..ed7d0a2f98d 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java @@ -24,6 +24,7 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; +import org.apache.gravitino.exceptions.IllegalNameIdentifierException; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements ExceptionMapper { ImmutableMap., Integer>builder() .put(IllegalArgumentException.class, 400) .put(ValidationException.class, 400) + .put(IllegalNameIdentifierException.class, 400) .put(NamespaceNotEmptyException.class, 400) .put(NotAuthorizedException.class, 401) + .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403) .put(ForbiddenException.class, 403) .put(NoSuchNamespaceException.class, 404) .put(NoSuchTableException.class, 404) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index fb0e8005c16..af017b2de96 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -18,12 +18,21 @@ */ package org.apache.gravitino.iceberg.service; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; +import java.util.Arrays; +import java.util.stream.Stream; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.rest.responses.ErrorResponse; public class IcebergRestUtils { @@ -71,4 +80,50 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours) } return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant(); } + + public static NameIdentifier getGravitinoNameIdentifier( + String metalakeName, String catalogName, TableIdentifier icebergIdentifier) { + Stream catalogNS = + Stream.concat( + Stream.of(metalakeName, catalogName), + Arrays.stream(icebergIdentifier.namespace().levels())); + String[] catalogNSTable = + Stream.concat(catalogNS, Stream.of(icebergIdentifier.name())).toArray(String[]::new); + return NameIdentifier.of(catalogNSTable); + } + + public static String getCatalogName(String rawPrefix) { + String catalogName = normalizePrefix(rawPrefix); + Preconditions.checkArgument( + !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName), + String.format( + "%s is conflicted with reserved catalog name, please replace it", catalogName)); + if (StringUtils.isBlank(catalogName)) { + return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; + } + return catalogName; + } + + public static T cloneIcebergRESTObject(Object message, Class className) { + ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance(); + try { + byte[] values = icebergObjectMapper.writeValueAsBytes(message); + return icebergObjectMapper.readValue(values, className); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to + // 'iceberg_catalog' + private static String normalizePrefix(String rawPrefix) { + if (StringUtils.isBlank(rawPrefix)) { + return rawPrefix; + } else { + // rawPrefix is a string matching ([^/]*/) which end with / + Preconditions.checkArgument( + rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); + return rawPrefix.substring(0, rawPrefix.length() - 1); + } + } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java new file mode 100644 index 00000000000..315ba620378 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.gravitino.listener.EventBus; +import org.apache.gravitino.listener.api.event.IcebergCreateTableEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent; +import org.apache.gravitino.utils.PrincipalUtils; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationExecutor} that + * not only delegates table operations to the underlying dispatcher but also dispatches + * corresponding events to an {@link org.apache.gravitino.listener.EventBus}. + */ +public class IcebergTableEventDispatcher implements IcebergTableOperationDispatcher { + + private IcebergTableOperationDispatcher icebergTableOperationDispatcher; + private EventBus eventBus; + private String metalakeName; + + public IcebergTableEventDispatcher( + IcebergTableOperationDispatcher icebergTableOperationDispatcher, + EventBus eventBus, + String metalakeName) { + this.icebergTableOperationDispatcher = icebergTableOperationDispatcher; + this.eventBus = eventBus; + this.metalakeName = metalakeName; + } + + @Override + public LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest) { + TableIdentifier tableIdentifier = TableIdentifier.of(namespace, createTableRequest.name()); + NameIdentifier nameIdentifier = + IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier); + eventBus.dispatchEvent( + new IcebergCreateTablePreEvent( + PrincipalUtils.getCurrentUserName(), nameIdentifier, createTableRequest)); + LoadTableResponse loadTableResponse; + try { + loadTableResponse = + icebergTableOperationDispatcher.createTable(catalogName, namespace, createTableRequest); + } catch (Exception e) { + eventBus.dispatchEvent( + new IcebergCreateTableFailureEvent( + PrincipalUtils.getCurrentUserName(), nameIdentifier, e)); + throw e; + } + eventBus.dispatchEvent( + new IcebergCreateTableEvent( + PrincipalUtils.getCurrentUserName(), + nameIdentifier, + createTableRequest, + loadTableResponse)); + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java new file mode 100644 index 00000000000..948e4866217 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg + * tables. + */ +public interface IcebergTableOperationDispatcher { + /** + * Creates a new Iceberg table. + * + * @param catalogName The catalog name when creating the table. + * @param namespace The namespace within which the table should be created. + * @param createTableRequest The request object containing the details for creating the table. + * @return A {@link LoadTableResponse} object containing the result of the operation. + */ + LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest); +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java new file mode 100644 index 00000000000..9a51d7b7a00 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationExecutor.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +public class IcebergTableOperationExecutor implements IcebergTableOperationDispatcher { + + private IcebergCatalogWrapperManager icebergCatalogWrapperManager; + + public IcebergTableOperationExecutor(IcebergCatalogWrapperManager icebergCatalogWrapperManager) { + this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; + } + + @Override + public LoadTableResponse createTable( + String catalogName, Namespace namespace, CreateTableRequest createTableRequest) { + return icebergCatalogWrapperManager + .getCatalogWrapper(catalogName) + .createTable(namespace, createTableRequest); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java similarity index 62% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java index 4965f4bc132..0f35fae529a 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/DynamicIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/DynamicIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -29,9 +29,6 @@ import org.apache.gravitino.client.GravitinoAdminClient; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This provider proxy Gravitino lakehouse-iceberg catalogs. @@ -40,11 +37,7 @@ * *

The catalogName is iceberg_catalog */ -public class DynamicIcebergCatalogConfigProvider - implements IcebergCatalogConfigProvider, AutoCloseable { - public static final Logger LOG = - LoggerFactory.getLogger(DynamicIcebergCatalogConfigProvider.class); - +public class DynamicIcebergConfigProvider implements IcebergConfigProvider { private String gravitinoMetalake; private GravitinoAdminClient client; @@ -68,8 +61,8 @@ public Optional getIcebergCatalogConfig(String catalogName) { Preconditions.checkArgument( StringUtils.isNotBlank(catalogName), "blank catalogName is illegal"); Preconditions.checkArgument( - !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName), - IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider"); + !IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName), + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + " is illegal in gravitino-based-provider"); Catalog catalog; try { @@ -93,9 +86,14 @@ void setClient(GravitinoAdminClient client) { } @Override - public void close() throws Exception { + public void close() { if (client != null) { client.close(); } } + + @Override + public String getMetalakeName() { + return gravitinoMetalake; + } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java new file mode 100644 index 00000000000..55d9793fba4 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProvider.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.iceberg.service.provider; + +import java.io.Closeable; +import java.util.Map; +import java.util.Optional; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.iceberg.common.IcebergConfig; + +/** + * {@code IcebergConfigProvider} is an interface defining how Iceberg REST catalog server gets + * Iceberg catalog configurations. + */ +public interface IcebergConfigProvider extends Closeable { + + /** + * Initialize {@code IcebergConfigProvider} with properties. + * + * @param properties The parameters for creating Provider which from configurations whose prefix + * is 'gravitino.iceberg-rest.' + */ + void initialize(Map properties); + + /** + * Get Iceberg configuration from catalog name. + * + * @param catalogName Iceberg catalog name. + * @return the configuration of Iceberg catalog. + */ + Optional getIcebergCatalogConfig(String catalogName); + + /** + * Get metalake name. + * + * @return the name of metalake. + */ + default String getMetalakeName() { + return IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java new file mode 100644 index 00000000000..f217590314e --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/IcebergConfigProviderFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.provider; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.gravitino.iceberg.common.IcebergConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergConfigProviderFactory { + public static final Logger LOG = LoggerFactory.getLogger(IcebergConfigProviderFactory.class); + + private static final ImmutableMap ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = + ImmutableMap.of( + IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + StaticIcebergConfigProvider.class.getCanonicalName(), + IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME, + DynamicIcebergConfigProvider.class.getCanonicalName()); + + public static IcebergConfigProvider create(Map properties) { + String providerName = + (new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER); + String className = + ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName); + LOG.info("Load Iceberg catalog provider: {}.", className); + try { + Class providerClz = Class.forName(className); + return (IcebergConfigProvider) providerClz.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java similarity index 64% rename from iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java rename to iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java index aa7f1032134..f8c4fb3505e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/StaticIcebergCatalogConfigProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/provider/StaticIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.annotations.VisibleForTesting; import java.util.Map; @@ -24,7 +24,6 @@ import java.util.stream.Collectors; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider; import org.apache.gravitino.utils.MapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +38,8 @@ * gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive * gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ... */ -public class StaticIcebergCatalogConfigProvider implements IcebergCatalogConfigProvider { - public static final Logger LOG = - LoggerFactory.getLogger(StaticIcebergCatalogConfigProvider.class); +public class StaticIcebergConfigProvider implements IcebergConfigProvider { + public static final Logger LOG = LoggerFactory.getLogger(StaticIcebergConfigProvider.class); @VisibleForTesting Map catalogConfigs; @@ -61,7 +59,7 @@ public void initialize(Map properties) { MapUtils.getPrefixMap( properties, String.format("catalog.%s.", catalogName))))); this.catalogConfigs.put( - IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new IcebergConfig(properties)); + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, new IcebergConfig(properties)); } @Override @@ -69,6 +67,9 @@ public Optional getIcebergCatalogConfig(String catalogName) { return Optional.ofNullable(catalogConfigs.get(catalogName)); } + @Override + public void close() {} + private Optional getCatalogName(String catalogConfigKey) { if (!catalogConfigKey.startsWith("catalog.")) { return Optional.empty(); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java index 33023343ef3..cebb748845e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java @@ -50,8 +50,10 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergObjectMapper; import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; import org.apache.gravitino.metrics.MetricNames; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.ServiceUnavailableException; import org.apache.iceberg.rest.RESTUtil; @@ -76,6 +78,7 @@ public class IcebergTableOperations { private IcebergMetricsManager icebergMetricsManager; private ObjectMapper icebergObjectMapper; + private IcebergTableOperationDispatcher tableOperationDispatcher; @SuppressWarnings("UnusedVariable") @Context @@ -84,10 +87,12 @@ public class IcebergTableOperations { @Inject public IcebergTableOperations( IcebergCatalogWrapperManager icebergCatalogWrapperManager, - IcebergMetricsManager icebergMetricsManager) { + IcebergMetricsManager icebergMetricsManager, + IcebergTableOperationDispatcher tableOperationDispatcher) { this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; - this.icebergObjectMapper = IcebergObjectMapper.getInstance(); this.icebergMetricsManager = icebergMetricsManager; + this.tableOperationDispatcher = tableOperationDispatcher; + this.icebergObjectMapper = IcebergObjectMapper.getInstance(); } @GET @@ -110,16 +115,18 @@ public Response createTable( CreateTableRequest createTableRequest, @HeaderParam(X_ICEBERG_ACCESS_DELEGATION) String accessDelegation) { boolean isCredentialVending = isCredentialVending(accessDelegation); + String catalogName = IcebergRestUtils.getCatalogName(prefix); + Namespace icebergNS = RESTUtil.decodeNamespace(namespace); LOG.info( - "Create Iceberg table, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", - namespace, + "Create Iceberg table, catalog: {}, namespace: {}, create table request: {}, " + + "accessDelegation: {}, isCredentialVending: {}", + catalogName, + icebergNS, createTableRequest, accessDelegation, isCredentialVending); LoadTableResponse loadTableResponse = - icebergCatalogWrapperManager - .getOps(prefix) - .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest); + tableOperationDispatcher.createTable(catalogName, icebergNS, createTableRequest); if (isCredentialVending) { return IcebergRestUtils.ok(injectCredentialConfig(prefix, loadTableResponse)); } else { diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java new file mode 100644 index 00000000000..1ce2d8f770e --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** Represent an event after creating Iceberg table successfully. */ +@DeveloperApi +public class IcebergCreateTableEvent extends IcebergTableEvent { + + private CreateTableRequest createTableRequest; + private LoadTableResponse loadTableResponse; + + public IcebergCreateTableEvent( + String user, + NameIdentifier resourceIdentifier, + CreateTableRequest createTableRequest, + LoadTableResponse loadTableResponse) { + super(user, resourceIdentifier); + this.createTableRequest = + IcebergRestUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class); + this.loadTableResponse = + IcebergRestUtils.cloneIcebergRESTObject(loadTableResponse, LoadTableResponse.class); + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } + + public LoadTableResponse loadTableResponse() { + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java new file mode 100644 index 00000000000..24f74da4fdf --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represent a failure event when creating Iceberg table failed. */ +@DeveloperApi +public class IcebergCreateTableFailureEvent extends IcebergTableFailureEvent { + public IcebergCreateTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java new file mode 100644 index 00000000000..81937e501bc --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.iceberg.rest.requests.CreateTableRequest; + +/** Represent a pre event before creating Iceberg table. */ +@DeveloperApi +public class IcebergCreateTablePreEvent extends IcebergTablePreEvent { + private CreateTableRequest createTableRequest; + + public IcebergCreateTablePreEvent( + String user, NameIdentifier resourceIdentifier, CreateTableRequest createTableRequest) { + super(user, resourceIdentifier); + this.createTableRequest = createTableRequest; + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java new file mode 100644 index 00000000000..50ec078637d --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract post event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergEvent extends Event { + protected IcebergEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java new file mode 100644 index 00000000000..e166bfdc919 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract failure event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergFailureEvent extends FailureEvent { + protected IcebergFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java new file mode 100644 index 00000000000..e57edde6542 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergPreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergPreEvent extends PreEvent { + protected IcebergPreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java new file mode 100644 index 00000000000..e94a1938c1c --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; + +/** Represents an abstract table post event in Gravitino Iceberg REST server. */ +public abstract class IcebergTableEvent extends IcebergEvent { + protected IcebergTableEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java new file mode 100644 index 00000000000..f052b0060e4 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTableFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represent a failure event when do Iceberg table operation failed. */ +@DeveloperApi +public class IcebergTableFailureEvent extends IcebergFailureEvent { + protected IcebergTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java new file mode 100644 index 00000000000..486c6f094a4 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract table pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergTablePreEvent extends IcebergPreEvent { + protected IcebergTablePreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java index 328bafa627d..85a7fdc04e9 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergCatalogWrapperManager.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -39,7 +41,9 @@ public void testValidGetOps(String rawPrefix) { } Map config = Maps.newHashMap(); config.put(String.format("catalog.%s.catalog-backend-name", prefix), prefix); - IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + configProvider.initialize(config); + IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); IcebergCatalogWrapper ops = manager.getOps(rawPrefix); @@ -51,11 +55,12 @@ public void testValidGetOps(String rawPrefix) { } @ParameterizedTest - @ValueSource( - strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "__gravitino_default_catalog/"}) + @ValueSource(strings = {"hello", "\\\n\t\\\'", "\u0024", "\100", "[_~", "default_catalog/"}) public void testInvalidGetOps(String rawPrefix) { Map config = Maps.newHashMap(); - IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(config); + configProvider.initialize(config); + IcebergCatalogWrapperManager manager = new IcebergCatalogWrapperManager(config, configProvider); Assertions.assertThrowsExactly(IllegalArgumentException.class, () -> manager.getOps(rawPrefix)); } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java new file mode 100644 index 00000000000..79a317e52bd --- /dev/null +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergRESTUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestIcebergRESTUtils { + + @Test + void testGetGravitinoNameIdentifier() { + String metalakeName = "metalake"; + String catalogName = "catalog"; + TableIdentifier tableIdentifier = TableIdentifier.of("ns1", "ns2", "table"); + NameIdentifier nameIdentifier = + IcebergRestUtils.getGravitinoNameIdentifier(metalakeName, catalogName, tableIdentifier); + Assertions.assertEquals( + NameIdentifier.of(metalakeName, catalogName, "ns1", "ns2", "table"), nameIdentifier); + } + + @Test + void testGetCatalogName() { + String prefix = "catalog/"; + Assertions.assertEquals("catalog", IcebergRestUtils.getCatalogName(prefix)); + Assertions.assertEquals( + IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, IcebergRestUtils.getCatalogName("")); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> IcebergRestUtils.getCatalogName(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + "/")); + } + + @Test + void testSerdeIcebergRESTObject() { + Schema tableSchema = + new Schema( + NestedField.of(1, false, "foo1", StringType.get()), + NestedField.of(2, true, "foo2", IntegerType.get())); + CreateTableRequest createTableRequest = + CreateTableRequest.builder().withName("table").withSchema(tableSchema).build(); + CreateTableRequest clonedIcebergRESTObject = + IcebergRestUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class); + Assertions.assertEquals(createTableRequest.name(), clonedIcebergRESTObject.name()); + Assertions.assertEquals( + createTableRequest.schema().columns().size(), + clonedIcebergRESTObject.schema().columns().size()); + for (int i = 0; i < createTableRequest.schema().columns().size(); i++) { + NestedField field = createTableRequest.schema().columns().get(i); + NestedField clonedField = clonedIcebergRESTObject.schema().columns().get(i); + Assertions.assertEquals(field, clonedField); + } + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java similarity index 78% rename from iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java index f9ffbb42747..4eb5da5afce 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestDynamicIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestDynamicIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import java.util.HashMap; import org.apache.gravitino.Catalog; @@ -30,7 +30,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -public class TestDynamicIcebergCatalogWrapperProvider { +public class TestDynamicIcebergConfigProvider { @Test public void testValidIcebergTableOps() { String hiveCatalogName = "hive_backend"; @@ -71,7 +71,7 @@ public void testValidIcebergTableOps() { } }); - DynamicIcebergCatalogConfigProvider provider = new DynamicIcebergCatalogConfigProvider(); + DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider(); GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); provider.setClient(client); @@ -102,7 +102,7 @@ public void testInvalidIcebergTableOps() { GravitinoAdminClient client = Mockito.mock(GravitinoAdminClient.class); Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); - DynamicIcebergCatalogConfigProvider provider = new DynamicIcebergCatalogConfigProvider(); + DynamicIcebergConfigProvider provider = new DynamicIcebergConfigProvider(); provider.setClient(client); Assertions.assertThrowsExactly( @@ -111,6 +111,6 @@ public void testInvalidIcebergTableOps() { IllegalArgumentException.class, () -> provider.getIcebergCatalogConfig("")); Assertions.assertThrowsExactly( IllegalArgumentException.class, - () -> provider.getIcebergCatalogConfig(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); + () -> provider.getIcebergCatalogConfig(IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java similarity index 79% rename from iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java rename to iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java index 69f5b5ad257..3a4766016e0 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestStaticIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/provider/TestStaticIcebergConfigProvider.java @@ -1,22 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.gravitino.iceberg.provider; +package org.apache.gravitino.iceberg.service.provider; import com.google.common.collect.Maps; import java.util.Map; @@ -32,13 +32,13 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -public class TestStaticIcebergCatalogWrapperProvider { +public class TestStaticIcebergConfigProvider { @Test public void testValidIcebergTableOps() { String hiveCatalogName = "hive_backend"; String jdbcCatalogName = "jdbc_backend"; - String defaultCatalogName = IcebergConstants.GRAVITINO_DEFAULT_CATALOG; + String defaultCatalogName = IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG; Map config = Maps.newHashMap(); // hive backend catalog @@ -60,7 +60,7 @@ public void testValidIcebergTableOps() { config.put("catalog-backend", "memory"); config.put("warehouse", "/tmp/"); - StaticIcebergCatalogConfigProvider provider = new StaticIcebergCatalogConfigProvider(); + StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider(); provider.initialize(config); IcebergConfig hiveIcebergConfig = provider.catalogConfigs.get(hiveCatalogName); @@ -106,7 +106,7 @@ public void testValidIcebergTableOps() { @ParameterizedTest @ValueSource(strings = {"", "not_match"}) public void testInvalidIcebergTableOps(String catalogName) { - StaticIcebergCatalogConfigProvider provider = new StaticIcebergCatalogConfigProvider(); + StaticIcebergConfigProvider provider = new StaticIcebergConfigProvider(); provider.initialize(Maps.newHashMap()); Optional config = provider.getIcebergCatalogConfig(catalogName); diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java index 7d359926a85..361b086d987 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergCatalogWrapperManagerForTest.java @@ -23,11 +23,13 @@ import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; // Provide a custom catalogWrapper to do test like `registerTable` public class IcebergCatalogWrapperManagerForTest extends IcebergCatalogWrapperManager { - public IcebergCatalogWrapperManagerForTest(Map properties) { - super(properties); + public IcebergCatalogWrapperManagerForTest( + Map properties, IcebergConfigProvider configProvider) { + super(properties, configProvider); } @Override diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index 1a085a251d9..1314a3bac44 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -20,18 +20,25 @@ package org.apache.gravitino.iceberg.service.rest; import com.google.common.collect.Maps; +import java.util.Arrays; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.credential.CredentialConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; -import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider; import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor; import org.apache.gravitino.iceberg.service.extension.DummyCredentialProvider; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider; +import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory; +import org.apache.gravitino.iceberg.service.provider.StaticIcebergConfigProvider; +import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.logging.LoggingFeature; @@ -80,14 +87,24 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe String catalogConfigPrefix = "catalog." + PREFIX; catalogConf.put( IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER, - StaticIcebergCatalogConfigProvider.class.getName()); + StaticIcebergConfigProvider.class.getName()); catalogConf.put(String.format("%s.catalog-backend-name", catalogConfigPrefix), PREFIX); catalogConf.put( CredentialConstants.CREDENTIAL_PROVIDER_TYPE, DummyCredentialProvider.DUMMY_CREDENTIAL_TYPE); + IcebergConfigProvider configProvider = IcebergConfigProviderFactory.create(catalogConf); + configProvider.initialize(catalogConf); // used to override register table interface IcebergCatalogWrapperManager icebergCatalogWrapperManager = - new IcebergCatalogWrapperManagerForTest(catalogConf); + new IcebergCatalogWrapperManagerForTest(catalogConf, configProvider); + + EventBus eventBus = new EventBus(Arrays.asList()); + + IcebergTableOperationExecutor icebergTableOperationExecutor = + new IcebergTableOperationExecutor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher( + icebergTableOperationExecutor, eventBus, configProvider.getMetalakeName()); IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register( @@ -96,6 +113,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(2); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(2); } }); } diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java index e383c65b7a4..36c112f00a2 100644 --- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java @@ -80,7 +80,7 @@ public GravitinoServer(ServerConfig config, GravitinoEnv gravitinoEnv) { } public void initialize() { - gravitinoEnv.initialize(serverConfig, true); + gravitinoEnv.initializeFullComponents(serverConfig); JettyServerConfig jettyServerConfig = JettyServerConfig.fromConfig(serverConfig, WEBSERVER_CONF_PREFIX);