Skip to content

Commit

Permalink
[#5112] feat(core): support pre event for event listener systems (#5110)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
support pre event for event listener systems
1. add new `PreEvent` to represent Pre event and only `SYNC` event
listeners could process `PreEvent`
2. keep `Event` as post event to keep compatibility.
3. `EventBus` dispatch event to corresponding event listeners.

### Why are the changes needed?
Fix: #5112 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add UT
  • Loading branch information
FANNG1 authored Oct 15, 2024
1 parent 53bd227 commit e9acd15
Show file tree
Hide file tree
Showing 16 changed files with 455 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.BaseEvent;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,7 +46,7 @@ public class AsyncQueueListener implements EventListenerPlugin {
private static final String NAME_PREFIX = "async-queue-listener-";

private final List<EventListenerPlugin> eventListeners;
private final BlockingQueue<Event> queue;
private final BlockingQueue<BaseEvent> queue;
private final Thread asyncProcessor;
private final int dispatcherJoinSeconds;
private final AtomicBoolean stopped = new AtomicBoolean(false);
Expand All @@ -68,20 +70,13 @@ public AsyncQueueListener(
}

@Override
public void onPostEvent(Event event) {
if (stopped.get()) {
LOG.warn(
"{} drop event: {}, since AsyncQueueListener is stopped",
asyncQueueListenerName,
event.getClass().getSimpleName());
return;
}

if (queue.offer(event)) {
return;
}
public void onPreEvent(PreEvent event) {
enqueueEvent(event);
}

logDropEventsIfNecessary();
@Override
public void onPostEvent(Event event) {
enqueueEvent(event);
}

@Override
Expand Down Expand Up @@ -117,8 +112,14 @@ List<EventListenerPlugin> getEventListeners() {
private void processEvents() {
while (!Thread.currentThread().isInterrupted()) {
try {
Event event = queue.take();
this.eventListeners.forEach(listener -> listener.onPostEvent(event));
BaseEvent baseEvent = queue.take();
if (baseEvent instanceof PreEvent) {
this.eventListeners.forEach(listener -> listener.onPreEvent((PreEvent) baseEvent));
} else if (baseEvent instanceof Event) {
this.eventListeners.forEach(listener -> listener.onPostEvent((Event) baseEvent));
} else {
LOG.warn("Unknown event type: {}", baseEvent.getClass().getSimpleName());
}
} catch (InterruptedException e) {
LOG.warn("{} event dispatcher thread is interrupted.", asyncQueueListenerName);
break;
Expand Down Expand Up @@ -154,4 +155,20 @@ private void logDropEventsIfNecessary() {
}
}
}

private void enqueueEvent(BaseEvent baseEvent) {
if (stopped.get()) {
LOG.warn(
"{} drop event: {}, since AsyncQueueListener is stopped",
asyncQueueListenerName,
baseEvent.getClass().getSimpleName());
return;
}

if (queue.offer(baseEvent)) {
return;
}

logDropEventsIfNecessary();
}
}
37 changes: 27 additions & 10 deletions core/src/main/java/org/apache/gravitino/listener/EventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.BaseEvent;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;

/**
* The {@code EventBus} class serves as a mechanism to dispatch events to registered listeners. It
Expand All @@ -34,26 +37,32 @@ public class EventBus {
// EventListenerPluginWrapper,
// which are meant for synchronous event listening, or AsyncQueueListener, designed for
// asynchronous event processing.
private final List<EventListenerPlugin> postEventListeners;
private final List<EventListenerPlugin> eventListeners;

/**
* Constructs an EventBus with a predefined list of event listeners.
*
* @param postEventListeners A list of {@link EventListenerPlugin} instances that are to be
* registered with this EventBus for event dispatch.
* @param eventListeners A list of {@link EventListenerPlugin} instances that are to be registered
* with this EventBus for event dispatch.
*/
public EventBus(List<EventListenerPlugin> postEventListeners) {
this.postEventListeners = postEventListeners;
public EventBus(List<EventListenerPlugin> eventListeners) {
this.eventListeners = eventListeners;
}

/**
* Dispatches an event to all registered listeners. Each listener processes the event based on its
* implementation, which could be either synchronous or asynchronous.
*
* @param event The event to be dispatched to all registered listeners.
* @param baseEvent The event to be dispatched to all registered listeners.
*/
public void dispatchEvent(Event event) {
postEventListeners.forEach(postEventListener -> postEventListener.onPostEvent(event));
public void dispatchEvent(BaseEvent baseEvent) {
if (baseEvent instanceof PreEvent) {
dispatchPreEvent((PreEvent) baseEvent);
} else if (baseEvent instanceof Event) {
dispatchPostEvent((Event) baseEvent);
} else {
throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName());
}
}

/**
Expand All @@ -64,7 +73,15 @@ public void dispatchEvent(Event event) {
* EventBus.
*/
@VisibleForTesting
List<EventListenerPlugin> getPostEventListeners() {
return postEventListeners;
List<EventListenerPlugin> getEventListeners() {
return eventListeners;
}

private void dispatchPostEvent(Event postEvent) {
eventListeners.forEach(eventListener -> eventListener.onPostEvent(postEvent));
}

private void dispatchPreEvent(PreEvent preEvent) throws ForbiddenException {
eventListeners.forEach(eventListener -> eventListener.onPreEvent(preEvent));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.listener.api.EventListenerPlugin;
import org.apache.gravitino.listener.api.event.BaseEvent;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,16 +70,40 @@ public void onPostEvent(Event event) {
try {
userEventListener.onPostEvent(event);
} catch (Exception e) {
LOG.warn(
"Event listener {} process event {} failed,",
listenerName,
event.getClass().getSimpleName(),
e);
printExceptionInEventProcess(listenerName, event, e);
}
}

@Override
public void onPreEvent(PreEvent preEvent) {
try {
userEventListener.onPreEvent(preEvent);
} catch (ForbiddenException e) {
if (Mode.SYNC.equals(mode())) {
LOG.warn(
"Event listener {} process pre event {} throws ForbiddenException, will skip the "
+ "operation.",
listenerName,
preEvent.getClass().getSimpleName(),
e);
throw e;
}
printExceptionInEventProcess(listenerName, preEvent, e);
} catch (Exception e) {
printExceptionInEventProcess(listenerName, preEvent, e);
}
}

@VisibleForTesting
EventListenerPlugin getUserEventListener() {
return userEventListener;
}

private void printExceptionInEventProcess(String listenerName, BaseEvent baseEvent, Exception e) {
LOG.warn(
"Event listener {} process event {} failed,",
listenerName,
baseEvent.getClass().getSimpleName(),
e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

import java.util.Map;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.exceptions.ForbiddenException;
import org.apache.gravitino.listener.api.event.Event;
import org.apache.gravitino.listener.api.event.PreEvent;

/**
* Defines an interface for event listeners that manage the lifecycle and state of a plugin,
Expand Down Expand Up @@ -95,17 +97,29 @@ enum Mode {
void stop() throws RuntimeException;

/**
* Handles events generated after the completion of an operation. Implementers are responsible for
* processing these events, which may involve additional logic to respond to the operation
* outcomes.
* Handle post-events generated after the completion of an operation.
*
* <p>This method provides a hook for post-operation event processing, allowing plugins to react
* or adapt based on the event details.
* <p>This method provides a hook for post-operation event processing, you couldn't change the
* resource in the event.
*
* @param event The event to be processed.
* @throws RuntimeException Indicates issues encountered during event processing.
* @param postEvent The post event to be processed.
* @throws RuntimeException Indicates issues encountered during event processing, this has no
* affect to the operation.
*/
void onPostEvent(Event event) throws RuntimeException;
default void onPostEvent(Event postEvent) throws RuntimeException {}

/**
* Handle pre-events generated before the operation.
*
* <p>This method handles pre-operation events in SYNC or ASYNC mode, any changes to resources in
* the event will affect the subsequent operations.
*
* @param preEvent The pre event to be processed.
* @throws ForbiddenException The subsequent operation will be skipped if and only if the event
* listener throwing {@code org.apache.gravitino.exceptions.ForbiddenException} and the event
* listener is SYNC mode, the exception will be ignored and logged only in other conditions.
*/
default void onPreEvent(PreEvent preEvent) throws ForbiddenException {}

/**
* Specifies the default operational mode for event processing by the plugin. The default
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import javax.annotation.Nullable;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/**
* The abstract base class for all events. It encapsulates common information such as the user who
* generated the event and the identifier for the resource associated with the event. Subclasses
* should provide specific details related to their individual event types.
*/
@DeveloperApi
public abstract class BaseEvent {
private final String user;
@Nullable private final NameIdentifier identifier;
private final long eventTime;

/**
* Constructs an Event instance with the specified user and resource identifier details.
*
* @param user The user associated with this event. It provides context about who triggered the
* event.
* @param identifier The resource identifier associated with this event. This may refer to various
* types of resources such as a metalake, catalog, schema, or table, etc.
*/
protected BaseEvent(String user, NameIdentifier identifier) {
this.user = user;
this.identifier = identifier;
this.eventTime = System.currentTimeMillis();
}

/**
* Retrieves the user associated with this event.
*
* @return A string representing the user associated with this event.
*/
public String user() {
return user;
}

/**
* Retrieves the resource identifier associated with this event.
*
* <p>For list operations within a namespace, the identifier is the identifier corresponds to that
* namespace. For metalake list operation, identifier is null.
*
* @return A NameIdentifier object that represents the resource, like a metalake, catalog, schema,
* table, etc., associated with the event.
*/
@Nullable
public NameIdentifier identifier() {
return identifier;
}

/**
* Returns the timestamp when the event was created.
*
* @return The event creation time in milliseconds since epoch.
*/
public long eventTime() {
return eventTime;
}
}
Loading

0 comments on commit e9acd15

Please sign in to comment.