Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SIP] support Iceberg event listener #5147

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,12 @@ public FutureGrantManager futureGrantManager() {
return futureGrantManager;
}

public void start() {
auxServiceManager.serviceStart();
public void start(boolean isGravitinoServer) {
metricsSystem.start();
eventListenerManager.start();
if (isGravitinoServer) {
auxServiceManager.serviceStart();
}
}

/** Shutdown the Gravitino environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* creation, deletion, or modification.
*/
@DeveloperApi
public abstract class CatalogEvent extends Event {
public abstract class CatalogEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code CatalogEvent} with the specified user and catalog identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a catalog operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* catalog operations, encapsulating details about the user who initiated the operation, the
* identifier of the catalog involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to catalog operations, encapsulating details about the user who initiated the operation,
* the identifier of the catalog involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class CatalogFailureEvent extends FailureEvent {
public abstract class CatalogFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code CatalogFailureEvent} instance, capturing information about the failed
* catalog operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* understanding of each event.
*/
@DeveloperApi
public abstract class FilesetEvent extends Event {
public abstract class FilesetEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code FilesetEvent} with the specified user and fileset identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* diagnose and respond to issues.
*/
@DeveloperApi
public abstract class FilesetFailureEvent extends FailureEvent {
public abstract class FilesetFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code FilesetFailureEvent} instance, capturing information about the failed
* fileset operation.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents a failure event in Gravitino server. */
@DeveloperApi
public abstract class GravitinoFailureEvent extends FailureEvent {
protected GravitinoFailureEvent(String user, NameIdentifier identifier, Exception exception) {
super(user, identifier, exception);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents a post event for Gravitino server. */
@DeveloperApi
public abstract class GravitinoPostEvent extends Event {
protected GravitinoPostEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* performing the operation and the identifier of the Metalake being operated on.
*/
@DeveloperApi
public abstract class MetalakeEvent extends Event {
public abstract class MetalakeEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code MetalakeEvent} with the specified user and Metalake identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a Metalake operation fails due to
* an exception. This class extends {@link FailureEvent} to provide a more specific context related
* to Metalake operations, encapsulating details about the user who initiated the operation, the
* identifier of the Metalake involved, and the exception that led to the failure.
* an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to Metalake operations, encapsulating details about the user who initiated the operation,
* the identifier of the Metalake involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class MetalakeFailureEvent extends FailureEvent {
public abstract class MetalakeFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code MetalakeFailureEvent} instance, capturing information about the failed
* Metalake operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* performing the operation and the identifier of the Partition being operated on.
*/
@DeveloperApi
public abstract class PartitionEvent extends Event {
public abstract class PartitionEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code PartitionEvent} with the specified user and Partition identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@

/**
* An abstract class representing events that are triggered when a Partition operation fails due to
* an exception. This class extends {@link FailureEvent} to provide a more specific context related
* to Partition operations, encapsulating details about the user who initiated the operation, the
* identifier of the Partition involved, and the exception that led to the failure.
* an exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to Partition operations, encapsulating details about the user who initiated the
* operation, the identifier of the Partition involved, and the exception that led to the failure.
*/
@DeveloperApi
public abstract class PartitionFailureEvent extends FailureEvent {
public abstract class PartitionFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code PartitionFailureEvent} instance, capturing information about the failed
* Partition operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/** Represents an abstract base class for events related to schema operations. */
@DeveloperApi
public abstract class SchemaEvent extends Event {
public abstract class SchemaEvent extends GravitinoPostEvent {
protected SchemaEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* exception.
*/
@DeveloperApi
public abstract class SchemaFailureEvent extends FailureEvent {
public abstract class SchemaFailureEvent extends GravitinoFailureEvent {
protected SchemaFailureEvent(String user, NameIdentifier identifier, Exception exception) {
super(user, identifier, exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* specific type of table operation being represented.
*/
@DeveloperApi
public abstract class TableEvent extends Event {
public abstract class TableEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TableEvent} with the specified user and table identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

/**
* An abstract class representing events that are triggered when a table operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* table operations, encapsulating details about the user who initiated the operation, the
* identifier of the table involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to table operations, encapsulating details about the user who initiated the operation,
* the identifier of the table involved, and the exception that led to the failure.
*
* <p>Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying tables, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
public abstract class TableFailureEvent extends FailureEvent {
public abstract class TableFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TableFailureEvent} instance, capturing information about the failed
* table operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* specific type of topic operation being represented.
*/
@DeveloperApi
public abstract class TopicEvent extends Event {
public abstract class TopicEvent extends GravitinoPostEvent {
/**
* Constructs a new {@code TopicEvent} with the specified user and topic identifier.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@

/**
* An abstract class representing events that are triggered when a topic operation fails due to an
* exception. This class extends {@link FailureEvent} to provide a more specific context related to
* topic operations, encapsulating details about the user who initiated the operation, the
* identifier of the topic involved, and the exception that led to the failure.
* exception. This class extends {@link GravitinoFailureEvent} to provide a more specific context
* related to topic operations, encapsulating details about the user who initiated the operation,
* the identifier of the topic involved, and the exception that led to the failure.
*
* <p>Implementations of this class can be used to convey detailed information about failures in
* operations such as creating, updating, deleting, or querying topics, making it easier to diagnose
* and respond to issues.
*/
@DeveloperApi
public abstract class TopicFailureEvent extends FailureEvent {
public abstract class TopicFailureEvent extends GravitinoFailureEvent {
/**
* Constructs a new {@code TopicFailureEvent} instance, capturing information about the failed
* topic operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ public interface IcebergCatalogWrapperProvider {
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
*/
IcebergCatalogWrapper getIcebergTableOps(String catalogName);
IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationProcessor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
Expand Down Expand Up @@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);

EventBus eventBus = GravitinoEnv.getInstance().eventBus();
icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
IcebergTableOperationProcessor icebergTableOperationProcessor =
new IcebergTableOperationProcessor(icebergCatalogWrapperManager);
IcebergTableEventDispatcher icebergTableEventDispatcher =
new IcebergTableEventDispatcher(icebergTableOperationProcessor, eventBus);
config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.iceberg.extension;

import java.util.Map;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergEventLogger implements EventListenerPlugin {

private static final Logger LOG = LoggerFactory.getLogger(IcebergEventLogger.class);

@Override
public void init(Map<String, String> properties) throws RuntimeException {}

@Override
public void start() throws RuntimeException {}

@Override
public void stop() throws RuntimeException {}

@Override
public void onPostEvent(Event event) {
LOG.info(
"Process post event {}, user: {}, identifier: {}",
event.getClass().getSimpleName(),
event.user(),
event.identifier());
}

@Override
public void onPreEvent(PreEvent event) {
LOG.info(
"Process pre event {}, user: {}, identifier: {}",
event.getClass().getSimpleName(),
event.user(),
event.identifier());
}

@Override
public Mode mode() {
return Mode.SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
if (icebergConfig == null) {
String errorMsg = String.format("%s can not match any catalog", catalogName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public IcebergCatalogWrapper getIcebergCatalogWrapper(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ private void initialize() {
}

private void start() {
gravitinoEnv.start(false);
icebergRESTService.serviceStart();
}

Expand Down
Loading
Loading