From 66b234a25e87babbc396154d3b693533f92b2aa3 Mon Sep 17 00:00:00 2001 From: fanng Date: Sat, 12 Oct 2024 16:52:21 +0800 Subject: [PATCH] support iceberg pre event listener --- .../org/apache/gravitino/GravitinoEnv.java | 6 +- .../listener/api/event/CatalogEvent.java | 2 +- .../api/event/CatalogFailureEvent.java | 8 +- .../listener/api/event/FilesetEvent.java | 2 +- .../api/event/FilesetFailureEvent.java | 2 +- .../api/event/GravitinoFailureEvent.java | 31 ++++++++ .../api/event/GravitinoPostEvent.java | 31 ++++++++ .../listener/api/event/MetalakeEvent.java | 2 +- .../api/event/MetalakeFailureEvent.java | 8 +- .../listener/api/event/PartitionEvent.java | 2 +- .../api/event/PartitionFailureEvent.java | 8 +- .../listener/api/event/SchemaEvent.java | 2 +- .../api/event/SchemaFailureEvent.java | 2 +- .../listener/api/event/TableEvent.java | 2 +- .../listener/api/event/TableFailureEvent.java | 8 +- .../listener/api/event/TopicEvent.java | 2 +- .../listener/api/event/TopicFailureEvent.java | 8 +- .../ops/IcebergCatalogWrapperProvider.java | 2 +- .../apache/gravitino/iceberg/RESTService.java | 10 +++ .../iceberg/extension/IcebergEventLogger.java | 64 ++++++++++++++++ ...figBasedIcebergCatalogWrapperProvider.java | 2 +- ...inoBasedIcebergCatalogWrapperProvider.java | 2 +- .../server/GravitinoIcebergRESTServer.java | 1 + .../service/IcebergCatalogWrapperManager.java | 39 +++------- .../service/IcebergExceptionMapper.java | 3 + .../iceberg/service/IcebergRestUtils.java | 27 +++++++ .../iceberg/service/IcebergSerdeUtils.java | 35 +++++++++ .../IcebergTableEventDispatcher.java | 73 +++++++++++++++++++ .../IcebergTableOperationDispatcher.java | 40 ++++++++++ .../IcebergTableOperationProcessor.java | 46 ++++++++++++ .../service/rest/IcebergTableOperations.java | 20 +++-- .../event/IcebergCreateTableFailureEvent.java | 31 ++++++++ .../event/IcebergCreateTablePostEvent.java | 54 ++++++++++++++ .../api/event/IcebergCreateTablePreEvent.java | 40 ++++++++++ .../api/event/IcebergRESTFailureEvent.java | 31 ++++++++ .../api/event/IcebergRESTPostEvent.java | 31 ++++++++ .../api/event/IcebergRESTPreEvent.java | 31 ++++++++ .../api/event/IcebergTablePostEvent.java | 29 ++++++++ .../api/event/IcebergTablePreEvent.java | 31 ++++++++ ...figBasedIcebergCatalogWrapperProvider.java | 8 +- ...inoBasedIcebergCatalogWrapperProvider.java | 11 +-- ...dIcebergCatalogWrapperProviderForTest.java | 2 +- .../service/rest/IcebergRestTestUtil.java | 12 +++ .../gravitino/server/GravitinoServer.java | 2 +- 44 files changed, 721 insertions(+), 82 deletions(-) create mode 100644 core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java create mode 100644 core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java create mode 100644 iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java index a27df871040..4208d620f5b 100644 --- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java +++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java @@ -307,10 +307,12 @@ public FutureGrantManager futureGrantManager() { return futureGrantManager; } - public void start() { - auxServiceManager.serviceStart(); + public void start(boolean isGravitinoServer) { metricsSystem.start(); eventListenerManager.start(); + if (isGravitinoServer) { + auxServiceManager.serviceStart(); + } } /** Shutdown the Gravitino environment. */ diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java index 094a19cda58..e1e5b6e322b 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogEvent.java @@ -28,7 +28,7 @@ * creation, deletion, or modification. */ @DeveloperApi -public abstract class CatalogEvent extends Event { +public abstract class CatalogEvent extends GravitinoPostEvent { /** * Constructs a new {@code CatalogEvent} with the specified user and catalog identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java index c964e1cff1b..f36a666bcb6 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/CatalogFailureEvent.java @@ -24,12 +24,12 @@ /** * An abstract class representing events that are triggered when a catalog operation fails due to an - * exception. This class extends {@link FailureEvent} to provide a more specific context related to - * catalog operations, encapsulating details about the user who initiated the operation, the - * identifier of the catalog involved, and the exception that led to the failure. + * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context + * related to catalog operations, encapsulating details about the user who initiated the operation, + * the identifier of the catalog involved, and the exception that led to the failure. */ @DeveloperApi -public abstract class CatalogFailureEvent extends FailureEvent { +public abstract class CatalogFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code CatalogFailureEvent} instance, capturing information about the failed * catalog operation. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java index 1f1e77e75e8..811488ae9dd 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetEvent.java @@ -33,7 +33,7 @@ * understanding of each event. */ @DeveloperApi -public abstract class FilesetEvent extends Event { +public abstract class FilesetEvent extends GravitinoPostEvent { /** * Constructs a new {@code FilesetEvent} with the specified user and fileset identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java index e5e9831970c..587a5d46a43 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/FilesetFailureEvent.java @@ -31,7 +31,7 @@ * diagnose and respond to issues. */ @DeveloperApi -public abstract class FilesetFailureEvent extends FailureEvent { +public abstract class FilesetFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code FilesetFailureEvent} instance, capturing information about the failed * fileset operation. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java new file mode 100644 index 00000000000..3340ce6f525 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents a failure event in Gravitino server. */ +@DeveloperApi +public abstract class GravitinoFailureEvent extends FailureEvent { + protected GravitinoFailureEvent(String user, NameIdentifier identifier, Exception exception) { + super(user, identifier, exception); + } +} diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java new file mode 100644 index 00000000000..2a4e4ebfb06 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/GravitinoPostEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents a post event for Gravitino server. */ +@DeveloperApi +public abstract class GravitinoPostEvent extends Event { + protected GravitinoPostEvent(String user, NameIdentifier identifier) { + super(user, identifier); + } +} diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java index a8e1f1ddc85..4590ca3abe7 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeEvent.java @@ -29,7 +29,7 @@ * performing the operation and the identifier of the Metalake being operated on. */ @DeveloperApi -public abstract class MetalakeEvent extends Event { +public abstract class MetalakeEvent extends GravitinoPostEvent { /** * Constructs a new {@code MetalakeEvent} with the specified user and Metalake identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java index 24d9f4043e3..dc41e51d2c2 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/MetalakeFailureEvent.java @@ -24,12 +24,12 @@ /** * An abstract class representing events that are triggered when a Metalake operation fails due to - * an exception. This class extends {@link FailureEvent} to provide a more specific context related - * to Metalake operations, encapsulating details about the user who initiated the operation, the - * identifier of the Metalake involved, and the exception that led to the failure. + * an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context + * related to Metalake operations, encapsulating details about the user who initiated the operation, + * the identifier of the Metalake involved, and the exception that led to the failure. */ @DeveloperApi -public abstract class MetalakeFailureEvent extends FailureEvent { +public abstract class MetalakeFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code MetalakeFailureEvent} instance, capturing information about the failed * Metalake operation. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java index ab6d1d404d8..7801eeff6cb 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionEvent.java @@ -29,7 +29,7 @@ * performing the operation and the identifier of the Partition being operated on. */ @DeveloperApi -public abstract class PartitionEvent extends Event { +public abstract class PartitionEvent extends GravitinoPostEvent { /** * Constructs a new {@code PartitionEvent} with the specified user and Partition identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java index aa7c5197623..87ae262636f 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PartitionFailureEvent.java @@ -24,12 +24,12 @@ /** * An abstract class representing events that are triggered when a Partition operation fails due to - * an exception. This class extends {@link FailureEvent} to provide a more specific context related - * to Partition operations, encapsulating details about the user who initiated the operation, the - * identifier of the Partition involved, and the exception that led to the failure. + * an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context + * related to Partition operations, encapsulating details about the user who initiated the + * operation, the identifier of the Partition involved, and the exception that led to the failure. */ @DeveloperApi -public abstract class PartitionFailureEvent extends FailureEvent { +public abstract class PartitionFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code PartitionFailureEvent} instance, capturing information about the failed * Partition operation. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java index 369dcf250e1..a1fb973de87 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaEvent.java @@ -24,7 +24,7 @@ /** Represents an abstract base class for events related to schema operations. */ @DeveloperApi -public abstract class SchemaEvent extends Event { +public abstract class SchemaEvent extends GravitinoPostEvent { protected SchemaEvent(String user, NameIdentifier identifier) { super(user, identifier); } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java index 006c29c8920..479c253489b 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/SchemaFailureEvent.java @@ -27,7 +27,7 @@ * exception. */ @DeveloperApi -public abstract class SchemaFailureEvent extends FailureEvent { +public abstract class SchemaFailureEvent extends GravitinoFailureEvent { protected SchemaFailureEvent(String user, NameIdentifier identifier, Exception exception) { super(user, identifier, exception); } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java index b665afba4b0..a476b8af671 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TableEvent.java @@ -32,7 +32,7 @@ * specific type of table operation being represented. */ @DeveloperApi -public abstract class TableEvent extends Event { +public abstract class TableEvent extends GravitinoPostEvent { /** * Constructs a new {@code TableEvent} with the specified user and table identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java index a1293d523f4..fb9bb0de5a5 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TableFailureEvent.java @@ -24,16 +24,16 @@ /** * An abstract class representing events that are triggered when a table operation fails due to an - * exception. This class extends {@link FailureEvent} to provide a more specific context related to - * table operations, encapsulating details about the user who initiated the operation, the - * identifier of the table involved, and the exception that led to the failure. + * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context + * related to table operations, encapsulating details about the user who initiated the operation, + * the identifier of the table involved, and the exception that led to the failure. * *

Implementations of this class can be used to convey detailed information about failures in * operations such as creating, updating, deleting, or querying tables, making it easier to diagnose * and respond to issues. */ @DeveloperApi -public abstract class TableFailureEvent extends FailureEvent { +public abstract class TableFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code TableFailureEvent} instance, capturing information about the failed * table operation. diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java index 4218484cd10..3daf5159682 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicEvent.java @@ -32,7 +32,7 @@ * specific type of topic operation being represented. */ @DeveloperApi -public abstract class TopicEvent extends Event { +public abstract class TopicEvent extends GravitinoPostEvent { /** * Constructs a new {@code TopicEvent} with the specified user and topic identifier. * diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java index c8dd727b64b..873b85f615d 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/TopicFailureEvent.java @@ -24,16 +24,16 @@ /** * An abstract class representing events that are triggered when a topic operation fails due to an - * exception. This class extends {@link FailureEvent} to provide a more specific context related to - * topic operations, encapsulating details about the user who initiated the operation, the - * identifier of the topic involved, and the exception that led to the failure. + * exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context + * related to topic operations, encapsulating details about the user who initiated the operation, + * the identifier of the topic involved, and the exception that led to the failure. * *

Implementations of this class can be used to convey detailed information about failures in * operations such as creating, updating, deleting, or querying topics, making it easier to diagnose * and respond to issues. */ @DeveloperApi -public abstract class TopicFailureEvent extends FailureEvent { +public abstract class TopicFailureEvent extends GravitinoFailureEvent { /** * Constructs a new {@code TopicFailureEvent} instance, capturing information about the failed * topic operation. diff --git a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java index 758aa46aa08..f400a412098 100644 --- a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapperProvider.java @@ -36,5 +36,5 @@ public interface IcebergCatalogWrapperProvider { * @param catalogName a param send by clients. * @return the instance of IcebergCatalogWrapper. */ - IcebergCatalogWrapper getIcebergTableOps(String catalogName); + IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java index 0592cfd9421..6f3d1a85482 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/RESTService.java @@ -28,7 +28,11 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.listener.EventBus; import org.apache.gravitino.metrics.MetricsSystem; import org.apache.gravitino.metrics.source.MetricsSource; import org.apache.gravitino.server.web.HttpServerMetricsSource; @@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) { new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server); metricsSystem.register(httpServerMetricsSource); + EventBus eventBus = GravitinoEnv.getInstance().eventBus(); icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig()); icebergMetricsManager = new IcebergMetricsManager(icebergConfig); + IcebergTableOperationProcessor icebergTableOperationProcessor = + new IcebergTableOperationProcessor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus); config.register( new AbstractBinder() { @Override protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1); } }); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java new file mode 100644 index 00000000000..944dd545e94 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/extension/IcebergEventLogger.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.extension; + +import java.util.Map; +import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergEventLogger implements EventListenerPlugin { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergEventLogger.class); + + @Override + public void init(Map properties) throws RuntimeException {} + + @Override + public void start() throws RuntimeException {} + + @Override + public void stop() throws RuntimeException {} + + @Override + public void onPostEvent(Event event) { + LOG.info( + "Process post event {}, user: {}, identifier: {}", + event.getClass().getSimpleName(), + event.user(), + event.identifier()); + } + + @Override + public void onPreEvent(PreEvent event) { + LOG.info( + "Process pre event {}, user: {}, identifier: {}", + event.getClass().getSimpleName(), + event.user(), + event.identifier()); + } + + @Override + public Mode mode() { + return Mode.SYNC; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java index 522bca39fe3..5ae65d37f58 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/ConfigBasedIcebergCatalogWrapperProvider.java @@ -68,7 +68,7 @@ public void initialize(Map properties) { } @Override - public IcebergCatalogWrapper getIcebergTableOps(String catalogName) { + public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) { IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName); if (icebergConfig == null) { String errorMsg = String.format("%s can not match any catalog", catalogName); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java index a38fd9cf302..aee1200b8cc 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/provider/GravitinoBasedIcebergCatalogWrapperProvider.java @@ -66,7 +66,7 @@ public void initialize(Map properties) { } @Override - public IcebergCatalogWrapper getIcebergTableOps(String catalogName) { + public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) { Preconditions.checkArgument( StringUtils.isNotBlank(catalogName), "blank catalogName is illegal"); Preconditions.checkArgument( diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java index 622f0d21acd..efec7b7559f 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/server/GravitinoIcebergRESTServer.java @@ -52,6 +52,7 @@ private void initialize() { } private void start() { + gravitinoEnv.start(false); icebergRESTService.serviceStart(); } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java index 17342acf71f..9e51f1ae77e 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergCatalogWrapperManager.java @@ -21,14 +21,11 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.StringUtils; -import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.iceberg.common.IcebergConfig; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper; import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider; @@ -83,24 +80,17 @@ public IcebergCatalogWrapperManager(Map properties) { * @return the instance of IcebergCatalogWrapper. */ public IcebergCatalogWrapper getOps(String rawPrefix) { - String catalogName = getCatalogName(rawPrefix); - IcebergCatalogWrapper tableOps = - icebergTableOpsCache.get(catalogName, k -> provider.getIcebergTableOps(catalogName)); - // Reload conf to reset UserGroupInformation or icebergTableOps will always use - // Simple auth. - tableOps.reloadHadoopConf(); - return tableOps; + String catalogName = IcebergRestUtils.getCatalogName(rawPrefix); + return getCatalogWrapper(catalogName); } - private String getCatalogName(String rawPrefix) { - String prefix = shelling(rawPrefix); - Preconditions.checkArgument( - !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix), - String.format("%s is conflict with reserved key, please replace it", prefix)); - if (StringUtils.isBlank(prefix)) { - return IcebergConstants.GRAVITINO_DEFAULT_CATALOG; - } - return prefix; + public IcebergCatalogWrapper getCatalogWrapper(String catalogName) { + IcebergCatalogWrapper catalogWrapper = + icebergTableOpsCache.get(catalogName, k -> provider.getIcebergCatalogWrapper(catalogName)); + // Reload conf to reset UserGroupInformation or icebergTableOps will always use + // Simple auth. + catalogWrapper.reloadHadoopConf(); + return catalogWrapper; } private IcebergCatalogWrapperProvider createProvider(Map properties) { @@ -116,17 +106,6 @@ private IcebergCatalogWrapperProvider createProvider(Map propert } } - private String shelling(String rawPrefix) { - if (StringUtils.isBlank(rawPrefix)) { - return rawPrefix; - } else { - // rawPrefix is a string matching ([^/]*/) which end with / - Preconditions.checkArgument( - rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); - return rawPrefix.substring(0, rawPrefix.length() - 1); - } - } - private void closeIcebergTableOps(IcebergCatalogWrapper ops) { try { ops.close(); diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java index f880f7f7a9f..ed7d0a2f98d 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java @@ -24,6 +24,7 @@ import javax.ws.rs.core.Response.Status; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; +import org.apache.gravitino.exceptions.IllegalNameIdentifierException; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; @@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements ExceptionMapper { ImmutableMap., Integer>builder() .put(IllegalArgumentException.class, 400) .put(ValidationException.class, 400) + .put(IllegalNameIdentifierException.class, 400) .put(NamespaceNotEmptyException.class, 400) .put(NotAuthorizedException.class, 401) + .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403) .put(ForbiddenException.class, 403) .put(NoSuchNamespaceException.class, 404) .put(NoSuchTableException.class, 404) diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java index fb0e8005c16..07f4ea8db9b 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergRestUtils.java @@ -18,12 +18,15 @@ */ package org.apache.gravitino.iceberg.service; +import com.google.common.base.Preconditions; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.iceberg.rest.responses.ErrorResponse; public class IcebergRestUtils { @@ -71,4 +74,28 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours) } return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant(); } + + public static String getCatalogName(String rawPrefix) { + String prefix = shelling(rawPrefix); + Preconditions.checkArgument( + !IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix), + String.format("%s is conflict with reserved key, please replace it", prefix)); + if (StringUtils.isBlank(prefix)) { + return IcebergConstants.GRAVITINO_DEFAULT_CATALOG; + } + return prefix; + } + + // remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to + // 'iceberg_catalog' + private static String shelling(String rawPrefix) { + if (StringUtils.isBlank(rawPrefix)) { + return rawPrefix; + } else { + // rawPrefix is a string matching ([^/]*/) which end with / + Preconditions.checkArgument( + rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix)); + return rawPrefix.substring(0, rawPrefix.length() - 1); + } + } } diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java new file mode 100644 index 00000000000..ae35407f03e --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergSerdeUtils.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; + +public class IcebergSerdeUtils { + public static T cloneIcebergRESTObject(Object message, Class className) { + ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance(); + try { + byte[] values = icebergObjectMapper.writeValueAsBytes(message); + return icebergObjectMapper.readValue(values, className); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java new file mode 100644 index 00000000000..9e65d876da1 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableEventDispatcher.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.listener.EventBus; +import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTablePostEvent; +import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent; +import org.apache.gravitino.utils.PrincipalUtils; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationProcessor} + * that not only delegates table operations to the underlying dispatcher but also dispatches + * corresponding events to an {@link org.apache.gravitino.listener.EventBus}. + */ +public class IcebergTableEventDispatcher implements IcebergTableOperationDispatcher { + + private IcebergTableOperationDispatcher icebergTableOperationDispatcher; + private EventBus eventBus; + + public IcebergTableEventDispatcher( + IcebergTableOperationDispatcher icebergTableOperationDispatcher, EventBus eventBus) { + this.icebergTableOperationDispatcher = icebergTableOperationDispatcher; + this.eventBus = eventBus; + } + + @Override + public LoadTableResponse createTable( + String catalogName, String namespace, CreateTableRequest createTableRequest) { + NameIdentifier tableIdentifier = + NameIdentifier.of(catalogName, namespace, createTableRequest.name()); + eventBus.dispatchEvent( + new IcebergCreateTablePreEvent( + PrincipalUtils.getCurrentUserName(), tableIdentifier, createTableRequest)); + LoadTableResponse loadTableResponse; + try { + loadTableResponse = + icebergTableOperationDispatcher.createTable(catalogName, namespace, createTableRequest); + } catch (Exception e) { + eventBus.dispatchEvent( + new IcebergCreateTableFailureEvent( + PrincipalUtils.getCurrentUserName(), tableIdentifier, e)); + throw e; + } + eventBus.dispatchEvent( + new IcebergCreateTablePostEvent( + PrincipalUtils.getCurrentUserName(), + NameIdentifier.of(namespace, createTableRequest.name()), + createTableRequest, + loadTableResponse)); + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java new file mode 100644 index 00000000000..c4ab97e5c79 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationDispatcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg + * tables. + */ +public interface IcebergTableOperationDispatcher { + /** + * Creates a new Iceberg table in the specified namespace with the given prefix. + * + * @param catalogName The catalog name when creating the table. + * @param namespace The namespace within which the table should be created. + * @param createTableRequest The request object containing the details for creating the table. + * @return A {@link LoadTableResponse} object containing the result of the operation. + */ + LoadTableResponse createTable( + String catalogName, String namespace, CreateTableRequest createTableRequest); +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java new file mode 100644 index 00000000000..d96d6bcac61 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/dispatcher/IcebergTableOperationProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.dispatcher; + +import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** + * The {@code IcebergTableOperationProcessor} locates the corresponding {@code + * IcebergCatalogWrapper} based on the prefix in order to perform the actual table operations. + */ +public class IcebergTableOperationProcessor implements IcebergTableOperationDispatcher { + + private IcebergCatalogWrapperManager icebergCatalogWrapperManager; + + public IcebergTableOperationProcessor(IcebergCatalogWrapperManager icebergCatalogWrapperManager) { + this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; + } + + @Override + public LoadTableResponse createTable( + String catalogName, String namespace, CreateTableRequest createTableRequest) { + return icebergCatalogWrapperManager + .getCatalogWrapper(catalogName) + .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java index 0c383e52063..150dd8be59d 100644 --- a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java @@ -40,6 +40,7 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergObjectMapper; import org.apache.gravitino.iceberg.service.IcebergRestUtils; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; import org.apache.gravitino.metrics.MetricNames; import org.apache.iceberg.catalog.TableIdentifier; @@ -47,6 +48,7 @@ import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.ReportMetricsRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +63,7 @@ public class IcebergTableOperations { private IcebergMetricsManager icebergMetricsManager; private ObjectMapper icebergObjectMapper; + private IcebergTableOperationDispatcher tableOperationDispatcher; @SuppressWarnings("UnusedVariable") @Context @@ -69,10 +72,12 @@ public class IcebergTableOperations { @Inject public IcebergTableOperations( IcebergCatalogWrapperManager icebergCatalogWrapperManager, - IcebergMetricsManager icebergMetricsManager) { + IcebergMetricsManager icebergMetricsManager, + IcebergTableOperationDispatcher tableOperationDispatcher) { this.icebergCatalogWrapperManager = icebergCatalogWrapperManager; - this.icebergObjectMapper = IcebergObjectMapper.getInstance(); this.icebergMetricsManager = icebergMetricsManager; + this.tableOperationDispatcher = tableOperationDispatcher; + this.icebergObjectMapper = IcebergObjectMapper.getInstance(); } @GET @@ -94,13 +99,14 @@ public Response createTable( @PathParam("namespace") String namespace, CreateTableRequest createTableRequest) { LOG.info( - "Create Iceberg table, namespace: {}, create table request: {}", + "Create Iceberg table, prefix: {}, namespace: {}, create table request: {}", + prefix, namespace, createTableRequest); - return IcebergRestUtils.ok( - icebergCatalogWrapperManager - .getOps(prefix) - .createTable(RESTUtil.decodeNamespace(namespace), createTableRequest)); + String catalogName = IcebergRestUtils.getCatalogName(prefix); + LoadTableResponse loadTableResponse = + tableOperationDispatcher.createTable(catalogName, namespace, createTableRequest); + return IcebergRestUtils.ok(loadTableResponse); } @POST diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java new file mode 100644 index 00000000000..ab21c33b2e2 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTableFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represent a failure event when creating Iceberg table failed. */ +@DeveloperApi +public class IcebergCreateTableFailureEvent extends IcebergRESTFailureEvent { + public IcebergCreateTableFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java new file mode 100644 index 00000000000..30893e7c2d7 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePostEvent.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.gravitino.iceberg.service.IcebergSerdeUtils; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; + +/** Represent an event after creating Iceberg table successfully. */ +@DeveloperApi +public class IcebergCreateTablePostEvent extends IcebergTablePostEvent { + + private CreateTableRequest createTableRequest; + private LoadTableResponse loadTableResponse; + + public IcebergCreateTablePostEvent( + String user, + NameIdentifier resourceIdentifier, + CreateTableRequest createTableRequest, + LoadTableResponse loadTableResponse) { + super(user, resourceIdentifier); + this.createTableRequest = + IcebergSerdeUtils.cloneIcebergRESTObject(createTableRequest, CreateTableRequest.class); + this.loadTableResponse = + IcebergSerdeUtils.cloneIcebergRESTObject(loadTableResponse, LoadTableResponse.class); + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } + + public LoadTableResponse loadTableResponse() { + return loadTableResponse; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java new file mode 100644 index 00000000000..81937e501bc --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergCreateTablePreEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.iceberg.rest.requests.CreateTableRequest; + +/** Represent a pre event before creating Iceberg table. */ +@DeveloperApi +public class IcebergCreateTablePreEvent extends IcebergTablePreEvent { + private CreateTableRequest createTableRequest; + + public IcebergCreateTablePreEvent( + String user, NameIdentifier resourceIdentifier, CreateTableRequest createTableRequest) { + super(user, resourceIdentifier); + this.createTableRequest = createTableRequest; + } + + public CreateTableRequest createTableRequest() { + return createTableRequest; + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java new file mode 100644 index 00000000000..e74d34b9e4c --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTFailureEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract failure event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergRESTFailureEvent extends FailureEvent { + protected IcebergRESTFailureEvent(String user, NameIdentifier nameIdentifier, Exception e) { + super(user, nameIdentifier, e); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java new file mode 100644 index 00000000000..204fa93d73c --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPostEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract post event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergRESTPostEvent extends Event { + protected IcebergRESTPostEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java new file mode 100644 index 00000000000..cf60297ee9b --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergRESTPreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergRESTPreEvent extends PreEvent { + protected IcebergRESTPreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java new file mode 100644 index 00000000000..cbc35f92e76 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePostEvent.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; + +/** Represents an abstract table post event in Gravitino Iceberg REST server. */ +public abstract class IcebergTablePostEvent extends IcebergRESTPostEvent { + protected IcebergTablePostEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java new file mode 100644 index 00000000000..cbda329d093 --- /dev/null +++ b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/listener/api/event/IcebergTablePreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents an abstract table pre event in Gravitino Iceberg REST server. */ +@DeveloperApi +public abstract class IcebergTablePreEvent extends IcebergRESTPreEvent { + protected IcebergTablePreEvent(String user, NameIdentifier resourceIdentifier) { + super(user, resourceIdentifier); + } +} diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java index 99e83f2e41d..2c48ffd7b11 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestConfigBasedIcebergCatalogWrapperProvider.java @@ -65,9 +65,9 @@ public void testValidIcebergTableOps() { IcebergConfig hiveIcebergConfig = provider.catalogConfigs.get(hiveCatalogName); IcebergConfig jdbcIcebergConfig = provider.catalogConfigs.get(jdbcCatalogName); IcebergConfig defaultIcebergConfig = provider.catalogConfigs.get(defaultCatalogName); - IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName); - IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName); - IcebergCatalogWrapper defaultOps = provider.getIcebergTableOps(defaultCatalogName); + IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogWrapper(hiveCatalogName); + IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogWrapper(jdbcCatalogName); + IcebergCatalogWrapper defaultOps = provider.getIcebergCatalogWrapper(defaultCatalogName); Assertions.assertEquals( hiveCatalogName, hiveIcebergConfig.get(IcebergConfig.CATALOG_BACKEND_NAME)); @@ -107,6 +107,6 @@ public void testInvalidIcebergTableOps(String catalogName) { provider.initialize(Maps.newHashMap()); Assertions.assertThrowsExactly( - RuntimeException.class, () -> provider.getIcebergTableOps(catalogName)); + RuntimeException.class, () -> provider.getIcebergCatalogWrapper(catalogName)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java index 8acac4ffd6b..7b9d5bf31c4 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/provider/TestGravitinoBasedIcebergCatalogWrapperProvider.java @@ -77,8 +77,8 @@ public void testValidIcebergTableOps() { Mockito.when(client.loadMetalake(Mockito.any())).thenReturn(gravitinoMetalake); provider.setClient(client); - IcebergCatalogWrapper hiveOps = provider.getIcebergTableOps(hiveCatalogName); - IcebergCatalogWrapper jdbcOps = provider.getIcebergTableOps(jdbcCatalogName); + IcebergCatalogWrapper hiveOps = provider.getIcebergCatalogWrapper(hiveCatalogName); + IcebergCatalogWrapper jdbcOps = provider.getIcebergCatalogWrapper(jdbcCatalogName); Assertions.assertEquals(hiveCatalogName, hiveOps.getCatalog().name()); Assertions.assertEquals(jdbcCatalogName, jdbcOps.getCatalog().name()); @@ -106,11 +106,12 @@ public void testInvalidIcebergTableOps() { provider.setClient(client); Assertions.assertThrowsExactly( - IllegalArgumentException.class, () -> provider.getIcebergTableOps(invalidCatalogName)); + IllegalArgumentException.class, + () -> provider.getIcebergCatalogWrapper(invalidCatalogName)); Assertions.assertThrowsExactly( - IllegalArgumentException.class, () -> provider.getIcebergTableOps("")); + IllegalArgumentException.class, () -> provider.getIcebergCatalogWrapper("")); Assertions.assertThrowsExactly( IllegalArgumentException.class, - () -> provider.getIcebergTableOps(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); + () -> provider.getIcebergCatalogWrapper(IcebergConstants.GRAVITINO_DEFAULT_CATALOG)); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java index 222391bcc04..94dcb19d173 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/ConfigBasedIcebergCatalogWrapperProviderForTest.java @@ -24,7 +24,7 @@ public class ConfigBasedIcebergCatalogWrapperProviderForTest extends ConfigBasedIcebergCatalogWrapperProvider { @Override - public IcebergCatalogWrapper getIcebergTableOps(String prefix) { + public IcebergCatalogWrapper getIcebergCatalogWrapper(String prefix) { return new IcebergCatalogWrapperForTest(); } } diff --git a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java index 4fc645132e1..ccc92d157b1 100644 --- a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java +++ b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/IcebergRestTestUtil.java @@ -20,6 +20,7 @@ package org.apache.gravitino.iceberg.service.rest; import com.google.common.collect.Maps; +import java.util.Arrays; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; @@ -28,7 +29,11 @@ import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager; import org.apache.gravitino.iceberg.service.IcebergExceptionMapper; import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher; +import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor; import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager; +import org.apache.gravitino.listener.EventBus; import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.logging.LoggingFeature; @@ -81,6 +86,12 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe IcebergCatalogWrapperManager icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(catalogConf); + EventBus eventBus = new EventBus(Arrays.asList()); + IcebergTableOperationProcessor icebergTableOperationProcessor = + new IcebergTableOperationProcessor(icebergCatalogWrapperManager); + IcebergTableEventDispatcher icebergTableEventDispatcher = + new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus); + IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig()); resourceConfig.register( new AbstractBinder() { @@ -88,6 +99,7 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe protected void configure() { bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(2); bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(2); + bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(2); } }); } diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java index e383c65b7a4..f1b32206a6c 100644 --- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java @@ -144,7 +144,7 @@ protected void configure() { } public void start() throws Exception { - gravitinoEnv.start(); + gravitinoEnv.start(true); server.start(); }