Skip to content

Commit

Permalink
support iceberg pre event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Oct 16, 2024
1 parent f0e7f36 commit 66b234a
Show file tree
Hide file tree
Showing 44 changed files with 721 additions and 82 deletions.
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,12 @@ public FutureGrantManager futureGrantManager() {
return futureGrantManager;
}

public void start() {
auxServiceManager.serviceStart();
public void start(boolean isGravitinoServer) {
metricsSystem.start();
eventListenerManager.start();
if (isGravitinoServer) {
auxServiceManager.serviceStart();
}
}

/** Shutdown the Gravitino environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* creation, deletion, or modification.
*/
@DeveloperApi
public abstract class CatalogEvent extends Event {
public abstract class CatalogEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code CatalogEvent} with the specified user and catalog identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a catalog operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* catalog operations, encapsulating details about the user who initiated the operation, the
* identifier of the catalog involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to catalog operations, encapsulating details about the user who initiated the operation,
* the identifier of the catalog involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class CatalogFailureEvent extends FailureEvent {
public abstract class CatalogFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code CatalogFailureEvent} instance, capturing information about the failed
* catalog operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* understanding of each event.
*/
@DeveloperApi
public abstract class FilesetEvent extends Event {
public abstract class FilesetEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code FilesetEvent} with the specified user and fileset identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* diagnose and respond to issues.
*/
@DeveloperApi
public abstract class FilesetFailureEvent extends FailureEvent {
public abstract class FilesetFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code FilesetFailureEvent} instance, capturing information about the failed
* fileset operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents a failure event in Gravitino server. */
@DeveloperApi
public abstract class GravitinoFailureEvent extends FailureEvent {
protected GravitinoFailureEvent(String user, NameIdentifier identifier, Exception exception) {
super(user, identifier, exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents a post event for Gravitino server. */
@DeveloperApi
public abstract class GravitinoPostEvent extends Event {
protected GravitinoPostEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* performing the operation and the identifier of the Metalake being operated on.
*/
@DeveloperApi
public abstract class MetalakeEvent extends Event {
public abstract class MetalakeEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code MetalakeEvent} with the specified user and Metalake identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a Metalake operation fails due to
* an exception. This class extends {@link FailureEvent} to provide a more specific context related
* to Metalake operations, encapsulating details about the user who initiated the operation, the
* identifier of the Metalake involved, and the exception that led to the failure.
* an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to Metalake operations, encapsulating details about the user who initiated the operation,
* the identifier of the Metalake involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class MetalakeFailureEvent extends FailureEvent {
public abstract class MetalakeFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code MetalakeFailureEvent} instance, capturing information about the failed
* Metalake operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* performing the operation and the identifier of the Partition being operated on.
*/
@DeveloperApi
public abstract class PartitionEvent extends Event {
public abstract class PartitionEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code PartitionEvent} with the specified user and Partition identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a Partition operation fails due to
* an exception. This class extends {@link FailureEvent} to provide a more specific context related
* to Partition operations, encapsulating details about the user who initiated the operation, the
* identifier of the Partition involved, and the exception that led to the failure.
* an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to Partition operations, encapsulating details about the user who initiated the
* operation, the identifier of the Partition involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class PartitionFailureEvent extends FailureEvent {
public abstract class PartitionFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code PartitionFailureEvent} instance, capturing information about the failed
* Partition operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** Represents an abstract base class for events related to schema operations. */
@DeveloperApi
public abstract class SchemaEvent extends Event {
public abstract class SchemaEvent extends GravitinoPostEvent {
protected SchemaEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* exception.
*/
@DeveloperApi
public abstract class SchemaFailureEvent extends FailureEvent {
public abstract class SchemaFailureEvent extends GravitinoFailureEvent {
protected SchemaFailureEvent(String user, NameIdentifier identifier, Exception exception) {
super(user, identifier, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* specific type of table operation being represented.
*/
@DeveloperApi
public abstract class TableEvent extends Event {
public abstract class TableEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TableEvent} with the specified user and table identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

/**
* An abstract class representing events that are triggered when a table operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* table operations, encapsulating details about the user who initiated the operation, the
* identifier of the table involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to table operations, encapsulating details about the user who initiated the operation,
* the identifier of the table involved, and the exception that led to the failure.
*
* <p>Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying tables, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
public abstract class TableFailureEvent extends FailureEvent {
public abstract class TableFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TableFailureEvent} instance, capturing information about the failed
* table operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* specific type of topic operation being represented.
*/
@DeveloperApi
public abstract class TopicEvent extends Event {
public abstract class TopicEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TopicEvent} with the specified user and topic identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

/**
* An abstract class representing events that are triggered when a topic operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* topic operations, encapsulating details about the user who initiated the operation, the
* identifier of the topic involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to topic operations, encapsulating details about the user who initiated the operation,
* the identifier of the topic involved, and the exception that led to the failure.
*
* <p>Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying topics, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
public abstract class TopicFailureEvent extends FailureEvent {
public abstract class TopicFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TopicFailureEvent} instance, capturing information about the failed
* topic operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public interface IcebergCatalogWrapperProvider {
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
*/
IcebergCatalogWrapper getIcebergTableOps(String catalogName);
IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
Expand Down Expand Up @@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);

EventBus eventBus = GravitinoEnv.getInstance().eventBus();
icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
IcebergTableOperationProcessor icebergTableOperationProcessor =
new IcebergTableOperationProcessor(icebergCatalogWrapperManager);
IcebergTableEventDispatcher icebergTableEventDispatcher =
new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus);
config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.iceberg.extension;

import java.util.Map;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergEventLogger implements EventListenerPlugin {

private static final Logger LOG = LoggerFactory.getLogger(IcebergEventLogger.class);

@Override
public void init(Map<String, String> properties) throws RuntimeException {}

@Override
public void start() throws RuntimeException {}

@Override
public void stop() throws RuntimeException {}

@Override
public void onPostEvent(Event event) {
LOG.info(
"Process post event {}, user: {}, identifier: {}",
event.getClass().getSimpleName(),
event.user(),
event.identifier());
}

@Override
public void onPreEvent(PreEvent event) {
LOG.info(
"Process pre event {}, user: {}, identifier: {}",
event.getClass().getSimpleName(),
event.user(),
event.identifier());
}

@Override
public Mode mode() {
return Mode.SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
if (icebergConfig == null) {
String errorMsg = String.format("%s can not match any catalog", catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private void initialize() {
}

private void start() {
gravitinoEnv.start(false);
icebergRESTService.serviceStart();
}

Expand Down
Loading

0 comments on commit 66b234a

Please sign in to comment.