diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index a27df871040..4208d620f5b 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -307,10 +307,12 @@ public FutureGrantManager futureGrantManager() {
return futureGrantManager;
}
- public void start() {
- auxServiceManager.serviceStart();
+ public void start(boolean isGravitinoServer) {
metricsSystem.start();
eventListenerManager.start();
+ if (isGravitinoServer) {
+ auxServiceManager.serviceStart();
+ }
}
/** Shutdown the Gravitino environment. */
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java
index 094a19cda58..e1e5b6e322b 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java
@@ -28,7 +28,7 @@
* creation, deletion, or modification.
*/
@DeveloperApi
-public abstract class CatalogEvent extends Event {
+public abstract class CatalogEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code CatalogEvent} with the specified user and catalog identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java
index c964e1cff1b..f36a666bcb6 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java
@@ -24,12 +24,12 @@
/**
* An abstract class representing events that are triggered when a catalog operation fails due to an
- * exception. This class extends {@link FailureEvent} to provide a more specific context related to
- * catalog operations, encapsulating details about the user who initiated the operation, the
- * identifier of the catalog involved, and the exception that led to the failure.
+ * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
+ * related to catalog operations, encapsulating details about the user who initiated the operation,
+ * the identifier of the catalog involved, and the exception that led to the failure.
*/
@DeveloperApi
-public abstract class CatalogFailureEvent extends FailureEvent {
+public abstract class CatalogFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code CatalogFailureEvent} instance, capturing information about the failed
* catalog operation.
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java
index 1f1e77e75e8..811488ae9dd 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java
@@ -33,7 +33,7 @@
* understanding of each event.
*/
@DeveloperApi
-public abstract class FilesetEvent extends Event {
+public abstract class FilesetEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code FilesetEvent} with the specified user and fileset identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java
index e5e9831970c..587a5d46a43 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java
@@ -31,7 +31,7 @@
* diagnose and respond to issues.
*/
@DeveloperApi
-public abstract class FilesetFailureEvent extends FailureEvent {
+public abstract class FilesetFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code FilesetFailureEvent} instance, capturing information about the failed
* fileset operation.
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java
new file mode 100644
index 00000000000..3340ce6f525
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents a failure event in Gravitino server. */
+@DeveloperApi
+public abstract class GravitinoFailureEvent extends FailureEvent {
+ protected GravitinoFailureEvent(String user, NameIdentifier identifier, Exception exception) {
+ super(user, identifier, exception);
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java
new file mode 100644
index 00000000000..2a4e4ebfb06
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents a post event for Gravitino server. */
+@DeveloperApi
+public abstract class GravitinoPostEvent extends Event {
+ protected GravitinoPostEvent(String user, NameIdentifier identifier) {
+ super(user, identifier);
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java
index a8e1f1ddc85..4590ca3abe7 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java
@@ -29,7 +29,7 @@
* performing the operation and the identifier of the Metalake being operated on.
*/
@DeveloperApi
-public abstract class MetalakeEvent extends Event {
+public abstract class MetalakeEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code MetalakeEvent} with the specified user and Metalake identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java
index 24d9f4043e3..dc41e51d2c2 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java
@@ -24,12 +24,12 @@
/**
* An abstract class representing events that are triggered when a Metalake operation fails due to
- * an exception. This class extends {@link FailureEvent} to provide a more specific context related
- * to Metalake operations, encapsulating details about the user who initiated the operation, the
- * identifier of the Metalake involved, and the exception that led to the failure.
+ * an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
+ * related to Metalake operations, encapsulating details about the user who initiated the operation,
+ * the identifier of the Metalake involved, and the exception that led to the failure.
*/
@DeveloperApi
-public abstract class MetalakeFailureEvent extends FailureEvent {
+public abstract class MetalakeFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code MetalakeFailureEvent} instance, capturing information about the failed
* Metalake operation.
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java
index ab6d1d404d8..7801eeff6cb 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java
@@ -29,7 +29,7 @@
* performing the operation and the identifier of the Partition being operated on.
*/
@DeveloperApi
-public abstract class PartitionEvent extends Event {
+public abstract class PartitionEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code PartitionEvent} with the specified user and Partition identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java
index aa7c5197623..87ae262636f 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java
@@ -24,12 +24,12 @@
/**
* An abstract class representing events that are triggered when a Partition operation fails due to
- * an exception. This class extends {@link FailureEvent} to provide a more specific context related
- * to Partition operations, encapsulating details about the user who initiated the operation, the
- * identifier of the Partition involved, and the exception that led to the failure.
+ * an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
+ * related to Partition operations, encapsulating details about the user who initiated the
+ * operation, the identifier of the Partition involved, and the exception that led to the failure.
*/
@DeveloperApi
-public abstract class PartitionFailureEvent extends FailureEvent {
+public abstract class PartitionFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code PartitionFailureEvent} instance, capturing information about the failed
* Partition operation.
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java
index 369dcf250e1..a1fb973de87 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java
@@ -24,7 +24,7 @@
/** Represents an abstract base class for events related to schema operations. */
@DeveloperApi
-public abstract class SchemaEvent extends Event {
+public abstract class SchemaEvent extends GravitinoPostEvent {
protected SchemaEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java
index 006c29c8920..479c253489b 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java
@@ -27,7 +27,7 @@
* exception.
*/
@DeveloperApi
-public abstract class SchemaFailureEvent extends FailureEvent {
+public abstract class SchemaFailureEvent extends GravitinoFailureEvent {
protected SchemaFailureEvent(String user, NameIdentifier identifier, Exception exception) {
super(user, identifier, exception);
}
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java
index b665afba4b0..a476b8af671 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java
@@ -32,7 +32,7 @@
* specific type of table operation being represented.
*/
@DeveloperApi
-public abstract class TableEvent extends Event {
+public abstract class TableEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TableEvent} with the specified user and table identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java
index a1293d523f4..fb9bb0de5a5 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java
@@ -24,16 +24,16 @@
/**
* An abstract class representing events that are triggered when a table operation fails due to an
- * exception. This class extends {@link FailureEvent} to provide a more specific context related to
- * table operations, encapsulating details about the user who initiated the operation, the
- * identifier of the table involved, and the exception that led to the failure.
+ * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
+ * related to table operations, encapsulating details about the user who initiated the operation,
+ * the identifier of the table involved, and the exception that led to the failure.
*
*
Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying tables, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
-public abstract class TableFailureEvent extends FailureEvent {
+public abstract class TableFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TableFailureEvent} instance, capturing information about the failed
* table operation.
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java
index 4218484cd10..3daf5159682 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java
@@ -32,7 +32,7 @@
* specific type of topic operation being represented.
*/
@DeveloperApi
-public abstract class TopicEvent extends Event {
+public abstract class TopicEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TopicEvent} with the specified user and topic identifier.
*
diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java
index c8dd727b64b..873b85f615d 100644
--- a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java
+++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java
@@ -24,16 +24,16 @@
/**
* An abstract class representing events that are triggered when a topic operation fails due to an
- * exception. This class extends {@link FailureEvent} to provide a more specific context related to
- * topic operations, encapsulating details about the user who initiated the operation, the
- * identifier of the topic involved, and the exception that led to the failure.
+ * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
+ * related to topic operations, encapsulating details about the user who initiated the operation,
+ * the identifier of the topic involved, and the exception that led to the failure.
*
*
Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying topics, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
-public abstract class TopicFailureEvent extends FailureEvent {
+public abstract class TopicFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TopicFailureEvent} instance, capturing information about the failed
* topic operation.
diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java
index 758aa46aa08..f400a412098 100644
--- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java
+++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java
@@ -36,5 +36,5 @@ public interface IcebergCatalogWrapperProvider {
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
*/
- IcebergCatalogWrapper getIcebergTableOps(String catalogName);
+ IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName);
}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
index 0592cfd9421..6f3d1a85482 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java
@@ -28,7 +28,11 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
+import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
@@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);
+ EventBus eventBus = GravitinoEnv.getInstance().eventBus();
icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
+ IcebergTableOperationProcessor icebergTableOperationProcessor =
+ new IcebergTableOperationProcessor(icebergCatalogWrapperManager);
+ IcebergTableEventDispatcher icebergTableEventDispatcher =
+ new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus);
config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
+ bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java
new file mode 100644
index 00000000000..944dd545e94
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.extension;
+
+import java.util.Map;
+import org.apache.gravitino.listener.api.EventListenerPlugin;
+import org.apache.gravitino.listener.api.event.Event;
+import org.apache.gravitino.listener.api.event.PreEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IcebergEventLogger implements EventListenerPlugin {
+
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergEventLogger.class);
+
+ @Override
+ public void init(Map properties) throws RuntimeException {}
+
+ @Override
+ public void start() throws RuntimeException {}
+
+ @Override
+ public void stop() throws RuntimeException {}
+
+ @Override
+ public void onPostEvent(Event event) {
+ LOG.info(
+ "Process post event {}, user: {}, identifier: {}",
+ event.getClass().getSimpleName(),
+ event.user(),
+ event.identifier());
+ }
+
+ @Override
+ public void onPreEvent(PreEvent event) {
+ LOG.info(
+ "Process pre event {}, user: {}, identifier: {}",
+ event.getClass().getSimpleName(),
+ event.user(),
+ event.identifier());
+ }
+
+ @Override
+ public Mode mode() {
+ return Mode.SYNC;
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java
index 522bca39fe3..5ae65d37f58 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java
@@ -68,7 +68,7 @@ public void initialize(Map properties) {
}
@Override
- public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
+ public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
if (icebergConfig == null) {
String errorMsg = String.format("%s can not match any catalog", catalogName);
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java
index a38fd9cf302..aee1200b8cc 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java
@@ -66,7 +66,7 @@ public void initialize(Map properties) {
}
@Override
- public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
+ public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
index 622f0d21acd..efec7b7559f 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java
@@ -52,6 +52,7 @@ private void initialize() {
}
private void start() {
+ gravitinoEnv.start(false);
icebergRESTService.serviceStart();
}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
index 17342acf71f..9e51f1ae77e 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java
@@ -21,14 +21,11 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider;
@@ -83,24 +80,17 @@ public IcebergCatalogWrapperManager(Map properties) {
* @return the instance of IcebergCatalogWrapper.
*/
public IcebergCatalogWrapper getOps(String rawPrefix) {
- String catalogName = getCatalogName(rawPrefix);
- IcebergCatalogWrapper tableOps =
- icebergTableOpsCache.get(catalogName, k -> provider.getIcebergTableOps(catalogName));
- // Reload conf to reset UserGroupInformation or icebergTableOps will always use
- // Simple auth.
- tableOps.reloadHadoopConf();
- return tableOps;
+ String catalogName = IcebergRestUtils.getCatalogName(rawPrefix);
+ return getCatalogWrapper(catalogName);
}
- private String getCatalogName(String rawPrefix) {
- String prefix = shelling(rawPrefix);
- Preconditions.checkArgument(
- !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
- String.format("%s is conflict with reserved key, please replace it", prefix));
- if (StringUtils.isBlank(prefix)) {
- return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
- }
- return prefix;
+ public IcebergCatalogWrapper getCatalogWrapper(String catalogName) {
+ IcebergCatalogWrapper catalogWrapper =
+ icebergTableOpsCache.get(catalogName, k -> provider.getIcebergCatalogWrapper(catalogName));
+ // Reload conf to reset UserGroupInformation or icebergTableOps will always use
+ // Simple auth.
+ catalogWrapper.reloadHadoopConf();
+ return catalogWrapper;
}
private IcebergCatalogWrapperProvider createProvider(Map properties) {
@@ -116,17 +106,6 @@ private IcebergCatalogWrapperProvider createProvider(Map propert
}
}
- private String shelling(String rawPrefix) {
- if (StringUtils.isBlank(rawPrefix)) {
- return rawPrefix;
- } else {
- // rawPrefix is a string matching ([^/]*/) which end with /
- Preconditions.checkArgument(
- rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
- return rawPrefix.substring(0, rawPrefix.length() - 1);
- }
- }
-
private void closeIcebergTableOps(IcebergCatalogWrapper ops) {
try {
ops.close();
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
index f880f7f7a9f..ed7d0a2f98d 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
@@ -24,6 +24,7 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
+import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
@@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements ExceptionMapper {
ImmutableMap., Integer>builder()
.put(IllegalArgumentException.class, 400)
.put(ValidationException.class, 400)
+ .put(IllegalNameIdentifierException.class, 400)
.put(NamespaceNotEmptyException.class, 400)
.put(NotAuthorizedException.class, 401)
+ .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403)
.put(ForbiddenException.class, 403)
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
index fb0e8005c16..07f4ea8db9b 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java
@@ -18,12 +18,15 @@
*/
package org.apache.gravitino.iceberg.service;
+import com.google.common.base.Preconditions;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.iceberg.rest.responses.ErrorResponse;
public class IcebergRestUtils {
@@ -71,4 +74,28 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours)
}
return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant();
}
+
+ public static String getCatalogName(String rawPrefix) {
+ String prefix = shelling(rawPrefix);
+ Preconditions.checkArgument(
+ !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
+ String.format("%s is conflict with reserved key, please replace it", prefix));
+ if (StringUtils.isBlank(prefix)) {
+ return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
+ }
+ return prefix;
+ }
+
+ // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to
+ // 'iceberg_catalog'
+ private static String shelling(String rawPrefix) {
+ if (StringUtils.isBlank(rawPrefix)) {
+ return rawPrefix;
+ } else {
+ // rawPrefix is a string matching ([^/]*/) which end with /
+ Preconditions.checkArgument(
+ rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
+ return rawPrefix.substring(0, rawPrefix.length() - 1);
+ }
+ }
}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java
new file mode 100644
index 00000000000..ae35407f03e
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+
+public class IcebergSerdeUtils {
+ public static T cloneIcebergRESTObject(Object message, Class className) {
+ ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance();
+ try {
+ byte[] values = icebergObjectMapper.writeValueAsBytes(message);
+ return icebergObjectMapper.readValue(values, className);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
new file mode 100644
index 00000000000..9e65d876da1
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.listener.EventBus;
+import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent;
+import org.apache.gravitino.listener.api.event.IcebergCreateTablePostEvent;
+import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/**
+ * {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationProcessor}
+ * that not only delegates table operations to the underlying dispatcher but also dispatches
+ * corresponding events to an {@link org.apache.gravitino.listener.EventBus}.
+ */
+public class IcebergTableEventDispatcher implements IcebergTableOperationDispatcher {
+
+ private IcebergTableOperationDispatcher icebergTableOperationDispatcher;
+ private EventBus eventBus;
+
+ public IcebergTableEventDispatcher(
+ IcebergTableOperationDispatcher icebergTableOperationDispatcher, EventBus eventBus) {
+ this.icebergTableOperationDispatcher = icebergTableOperationDispatcher;
+ this.eventBus = eventBus;
+ }
+
+ @Override
+ public LoadTableResponse createTable(
+ String catalogName, String namespace, CreateTableRequest createTableRequest) {
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(catalogName, namespace, createTableRequest.name());
+ eventBus.dispatchEvent(
+ new IcebergCreateTablePreEvent(
+ PrincipalUtils.getCurrentUserName(), tableIdentifier, createTableRequest));
+ LoadTableResponse loadTableResponse;
+ try {
+ loadTableResponse =
+ icebergTableOperationDispatcher.createTable(catalogName, namespace, createTableRequest);
+ } catch (Exception e) {
+ eventBus.dispatchEvent(
+ new IcebergCreateTableFailureEvent(
+ PrincipalUtils.getCurrentUserName(), tableIdentifier, e));
+ throw e;
+ }
+ eventBus.dispatchEvent(
+ new IcebergCreateTablePostEvent(
+ PrincipalUtils.getCurrentUserName(),
+ NameIdentifier.of(namespace, createTableRequest.name()),
+ createTableRequest,
+ loadTableResponse));
+ return loadTableResponse;
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
new file mode 100644
index 00000000000..c4ab97e5c79
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/**
+ * The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg
+ * tables.
+ */
+public interface IcebergTableOperationDispatcher {
+ /**
+ * Creates a new Iceberg table in the specified namespace with the given prefix.
+ *
+ * @param catalogName The catalog name when creating the table.
+ * @param namespace The namespace within which the table should be created.
+ * @param createTableRequest The request object containing the details for creating the table.
+ * @return A {@link LoadTableResponse} object containing the result of the operation.
+ */
+ LoadTableResponse createTable(
+ String catalogName, String namespace, CreateTableRequest createTableRequest);
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java
new file mode 100644
index 00000000000..d96d6bcac61
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.dispatcher;
+
+import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/**
+ * The {@code IcebergTableOperationProcessor} locates the corresponding {@code
+ * IcebergCatalogWrapper} based on the prefix in order to perform the actual table operations.
+ */
+public class IcebergTableOperationProcessor implements IcebergTableOperationDispatcher {
+
+ private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
+
+ public IcebergTableOperationProcessor(IcebergCatalogWrapperManager icebergCatalogWrapperManager) {
+ this.icebergCatalogWrapperManager = icebergCatalogWrapperManager;
+ }
+
+ @Override
+ public LoadTableResponse createTable(
+ String catalogName, String namespace, CreateTableRequest createTableRequest) {
+ return icebergCatalogWrapperManager
+ .getCatalogWrapper(catalogName)
+ .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
index 0c383e52063..150dd8be59d 100644
--- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
@@ -40,6 +40,7 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergObjectMapper;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -47,6 +48,7 @@
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.ReportMetricsRequest;
import org.apache.iceberg.rest.requests.UpdateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,7 @@ public class IcebergTableOperations {
private IcebergMetricsManager icebergMetricsManager;
private ObjectMapper icebergObjectMapper;
+ private IcebergTableOperationDispatcher tableOperationDispatcher;
@SuppressWarnings("UnusedVariable")
@Context
@@ -69,10 +72,12 @@ public class IcebergTableOperations {
@Inject
public IcebergTableOperations(
IcebergCatalogWrapperManager icebergCatalogWrapperManager,
- IcebergMetricsManager icebergMetricsManager) {
+ IcebergMetricsManager icebergMetricsManager,
+ IcebergTableOperationDispatcher tableOperationDispatcher) {
this.icebergCatalogWrapperManager = icebergCatalogWrapperManager;
- this.icebergObjectMapper = IcebergObjectMapper.getInstance();
this.icebergMetricsManager = icebergMetricsManager;
+ this.tableOperationDispatcher = tableOperationDispatcher;
+ this.icebergObjectMapper = IcebergObjectMapper.getInstance();
}
@GET
@@ -94,13 +99,14 @@ public Response createTable(
@PathParam("namespace") String namespace,
CreateTableRequest createTableRequest) {
LOG.info(
- "Create Iceberg table, namespace: {}, create table request: {}",
+ "Create Iceberg table, prefix: {}, namespace: {}, create table request: {}",
+ prefix,
namespace,
createTableRequest);
- return IcebergRestUtils.ok(
- icebergCatalogWrapperManager
- .getOps(prefix)
- .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest));
+ String catalogName = IcebergRestUtils.getCatalogName(prefix);
+ LoadTableResponse loadTableResponse =
+ tableOperationDispatcher.createTable(catalogName, namespace, createTableRequest);
+ return IcebergRestUtils.ok(loadTableResponse);
}
@POST
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java
new file mode 100644
index 00000000000..ab21c33b2e2
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represent a failure event when creating Iceberg table failed. */
+@DeveloperApi
+public class IcebergCreateTableFailureEvent extends IcebergRESTFailureEvent {
+ public IcebergCreateTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) {
+ super(user, nameIdentifier, e);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java
new file mode 100644
index 00000000000..30893e7c2d7
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.iceberg.service.IcebergSerdeUtils;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+
+/** Represent an event after creating Iceberg table successfully. */
+@DeveloperApi
+public class IcebergCreateTablePostEvent extends IcebergTablePostEvent {
+
+ private CreateTableRequest createTableRequest;
+ private LoadTableResponse loadTableResponse;
+
+ public IcebergCreateTablePostEvent(
+ String user,
+ NameIdentifier resourceIdentifier,
+ CreateTableRequest createTableRequest,
+ LoadTableResponse loadTableResponse) {
+ super(user, resourceIdentifier);
+ this.createTableRequest =
+ IcebergSerdeUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class);
+ this.loadTableResponse =
+ IcebergSerdeUtils.cloneIcebergRESTObject(loadTableResponse, LoadTableResponse.class);
+ }
+
+ public CreateTableRequest createTableRequest() {
+ return createTableRequest;
+ }
+
+ public LoadTableResponse loadTableResponse() {
+ return loadTableResponse;
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java
new file mode 100644
index 00000000000..81937e501bc
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+
+/** Represent a pre event before creating Iceberg table. */
+@DeveloperApi
+public class IcebergCreateTablePreEvent extends IcebergTablePreEvent {
+ private CreateTableRequest createTableRequest;
+
+ public IcebergCreateTablePreEvent(
+ String user, NameIdentifier resourceIdentifier, CreateTableRequest createTableRequest) {
+ super(user, resourceIdentifier);
+ this.createTableRequest = createTableRequest;
+ }
+
+ public CreateTableRequest createTableRequest() {
+ return createTableRequest;
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java
new file mode 100644
index 00000000000..e74d34b9e4c
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an abstract failure event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergRESTFailureEvent extends FailureEvent {
+ protected IcebergRESTFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) {
+ super(user, nameIdentifier, e);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java
new file mode 100644
index 00000000000..204fa93d73c
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an abstract post event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergRESTPostEvent extends Event {
+ protected IcebergRESTPostEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java
new file mode 100644
index 00000000000..cf60297ee9b
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an abstract pre event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergRESTPreEvent extends PreEvent {
+ protected IcebergRESTPreEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java
new file mode 100644
index 00000000000..cbc35f92e76
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+
+/** Represents an abstract table post event in Gravitino Iceberg REST server. */
+public abstract class IcebergTablePostEvent extends IcebergRESTPostEvent {
+ protected IcebergTablePostEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java
new file mode 100644
index 00000000000..cbda329d093
--- /dev/null
+++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.listener.api.event;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents an abstract table pre event in Gravitino Iceberg REST server. */
+@DeveloperApi
+public abstract class IcebergTablePreEvent extends IcebergRESTPreEvent {
+ protected IcebergTablePreEvent(String user, NameIdentifier resourceIdentifier) {
+ super(user, resourceIdentifier);
+ }
+}
diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java
index 99e83f2e41d..2c48ffd7b11 100644
--- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java
+++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java
@@ -65,9 +65,9 @@ public void testValidIcebergTableOps() {
IcebergConfig hiveIcebergConfig = provider.catalogConfigs.get(hiveCatalogName);
IcebergConfig jdbcIcebergConfig = provider.catalogConfigs.get(jdbcCatalogName);
IcebergConfig defaultIcebergConfig = provider.catalogConfigs.get(defaultCatalogName);
- IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName);
- IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName);
- IcebergCatalogWrapper defaultOps = provider.getIcebergTableOps(defaultCatalogName);
+ IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogWrapper(hiveCatalogName);
+ IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogWrapper(jdbcCatalogName);
+ IcebergCatalogWrapper defaultOps = provider.getIcebergCatalogWrapper(defaultCatalogName);
Assertions.assertEquals(
hiveCatalogName, hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME));
@@ -107,6 +107,6 @@ public void testInvalidIcebergTableOps(String catalogName) {
provider.initialize(Maps.newHashMap());
Assertions.assertThrowsExactly(
- RuntimeException.class, () -> provider.getIcebergTableOps(catalogName));
+ RuntimeException.class, () -> provider.getIcebergCatalogWrapper(catalogName));
}
}
diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java
index 8acac4ffd6b..7b9d5bf31c4 100644
--- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java
+++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java
@@ -77,8 +77,8 @@ public void testValidIcebergTableOps() {
Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake);
provider.setClient(client);
- IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName);
- IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName);
+ IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogWrapper(hiveCatalogName);
+ IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogWrapper(jdbcCatalogName);
Assertions.assertEquals(hiveCatalogName, hiveOps.getCatalog().name());
Assertions.assertEquals(jdbcCatalogName, jdbcOps.getCatalog().name());
@@ -106,11 +106,12 @@ public void testInvalidIcebergTableOps() {
provider.setClient(client);
Assertions.assertThrowsExactly(
- IllegalArgumentException.class, () -> provider.getIcebergTableOps(invalidCatalogName));
+ IllegalArgumentException.class,
+ () -> provider.getIcebergCatalogWrapper(invalidCatalogName));
Assertions.assertThrowsExactly(
- IllegalArgumentException.class, () -> provider.getIcebergTableOps(""));
+ IllegalArgumentException.class, () -> provider.getIcebergCatalogWrapper(""));
Assertions.assertThrowsExactly(
IllegalArgumentException.class,
- () -> provider.getIcebergTableOps(IcebergConstants.GRAVITINO_DEFAULT_CATALOG));
+ () -> provider.getIcebergCatalogWrapper(IcebergConstants.GRAVITINO_DEFAULT_CATALOG));
}
}
diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java
index 222391bcc04..94dcb19d173 100644
--- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java
+++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java
@@ -24,7 +24,7 @@
public class ConfigBasedIcebergCatalogWrapperProviderForTest
extends ConfigBasedIcebergCatalogWrapperProvider {
@Override
- public IcebergCatalogWrapper getIcebergTableOps(String prefix) {
+ public IcebergCatalogWrapper getIcebergCatalogWrapper(String prefix) {
return new IcebergCatalogWrapperForTest();
}
}
diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
index 4fc645132e1..ccc92d157b1 100644
--- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
+++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java
@@ -20,6 +20,7 @@
package org.apache.gravitino.iceberg.service.rest;
import com.google.common.collect.Maps;
+import java.util.Arrays;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -28,7 +29,11 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
+import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
+import org.apache.gravitino.listener.EventBus;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.logging.LoggingFeature;
@@ -81,6 +86,12 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
IcebergCatalogWrapperManager icebergCatalogWrapperManager =
new IcebergCatalogWrapperManager(catalogConf);
+ EventBus eventBus = new EventBus(Arrays.asList());
+ IcebergTableOperationProcessor icebergTableOperationProcessor =
+ new IcebergTableOperationProcessor(icebergCatalogWrapperManager);
+ IcebergTableEventDispatcher icebergTableEventDispatcher =
+ new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus);
+
IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
new AbstractBinder() {
@@ -88,6 +99,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(2);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2);
+ bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(2);
}
});
}
diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
index e383c65b7a4..f1b32206a6c 100644
--- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
+++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java
@@ -144,7 +144,7 @@ protected void configure() {
}
public void start() throws Exception {
- gravitinoEnv.start();
+ gravitinoEnv.start(true);
server.start();
}