Documentation: https://mdapena.github.io/pyventus
Source Code: https://github.com/mdapena/pyventus
Pyventus is a powerful Python package for event-driven programming. It offers a comprehensive suite of tools to easily define, emit, and orchestrate events. With Pyventus, you can build scalable, extensible, and loosely-coupled event-driven applications.
Pyventus offers several key features, such as:
- Sync and Async Support ─ Whether your code is synchronous or asynchronous, Pyventus allows you to define event handlers as either sync or async callbacks and emit events from both scopes.
- Customization ─ Whether you choose official emitters or custom ones, Pyventus allows you to customize the behavior and capabilities of the event emitters to perfectly align with your unique requirements.
- An Intuitive API ─ Pyventus provides a user-friendly API for defining events, emitters, and handlers. Its design simplifies the process of working with events, enabling you to organize your code around discrete events and their associated actions.
- Runtime Flexibility ─ Pyventus' runtime flexibility allows you to switch seamlessly between different built-in or custom event emitter implementations on the fly, providing a dynamic and adaptable environment for event-driven programming.
- Reliable Event Handling ─ Pyventus allows you to define handlers to customize how events are processed upon completion. Attach success and failure logic to take targeted actions based on the outcome of each event execution.
- Scalability and Maintainability ─ By adopting an event-driven approach with Pyventus, you can create scalable and maintainable code thanks to the loose coupling between its components that enables extensibility and modularity.
- Comprehensive Documentation ─ Pyventus provides a comprehensive documentation suite that includes API references, usage examples, and tutorials to effectively leverage all the features and capabilities of the package.
Pyventus is published as a Python package
and can be installed using pip
, ideally in a virtual environment
for proper dependency isolation. To get started, open up a terminal and install Pyventus with the following command:
pip install pyventus
Pyventus by default relies on the Python standard library and requires Python 3.10 or higher with no additional dependencies. However, this package also includes alternative integrations to access additional features such as Redis Queue, Celery, and FastAPI. For more information on this matter, please refer to the Optional Dependencies section.
Experience the power of Pyventus through a simple Hello, World!
example that illustrates
the core concepts and basic usage of the package. By following this example, you’ll learn how to subscribe to events
and emit them within your application.
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter
@EventLinker.on("GreetEvent")
def handle_greet_event():
print("Hello, World!")
event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")
You can also work with async
functions and contexts...
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter
@EventLinker.on("GreetEvent")
async def handle_greet_event():
print("Hello, World!")
event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent")
As we can see from the Hello, World!
example, Pyventus follows a simple and intuitive
workflow for defining and emitting events. Let's recap the essential steps involved:
-
Importing Necessary Modules:
We first imported the required modules from Pyventus, which encompassed the
EventLinker
class, theEventEmitter
interface, and theAsyncIOEventEmitter
implementation. -
Linking Events to Callbacks:
Next, we used the
@EventLinker.on()
decorator to define and link the string eventGreetEvent
to the functionhandle_greet_event()
, which will print 'Hello, World!' to the console whenever theGreetEvent
is emitted. -
Instantiating an Event Emitter:
After that, and in order to trigger our event, we needed to create an instance of the event emitter class. While
AsyncIOEventEmitter
was utilized, any built-in or custom implementation could be employed. -
Triggering the Event:
Finally, by using the
emit()
method of the event emitter instance, we were able to trigger theGreetEvent
, which resulted in the execution of thehandle_greet_event()
callback.
Having gained a clear understanding of the workflow showcased in the Hello, World!
example,
you are now well-equipped to explore more intricate event-driven scenarios and fully harness the capabilities of
Pyventus in your own projects. For a deep dive into the package's functionalities, you can refer to the
Pyventus Tutorials or API.
To showcase Pyventus' event-driven capabilities in a real-world scenario, we will explore a practical example of implementing a voltage sensor using an event-driven architecture (crucial for such scenarios). The purpose of this example is to create an efficient voltage sensor that can seamlessly handle real-time data and respond appropriately to specific voltage conditions.
Example ─ Monitoring Voltage Levels Across Devices (Context)
A common aspect found in many systems is the need to monitor and respond to changes in sensor data. Whether it's pressure sensors, temperature sensors, or other types, capturing and reacting to sensor data is crucial for effective system operation. In our practical example, we will focus on a specific scenario: building a sensor system that monitors voltage levels across devices. The goal of our voltage sensor is to detect potential issues, such as low or high voltage conditions, and respond appropriately in real-time.
To accomplish our goal, we will define a VoltageSensor
class to read voltage levels and emit
events based on predefined thresholds. We will create event handlers to respond to these events, performing actions
such as activating eco-mode for low voltage or implementing high-voltage protection. Additionally, a shared event
handler will provide general notifications for out-of-range voltage situations. The code example below illustrates
the implementation of this system.
import asyncio
import random
from pyventus import EventEmitter, EventLinker, AsyncIOEventEmitter
class VoltageSensor:
def __init__(self, name: str, low: float, high: float, event_emitter: EventEmitter) -> None:
# Initialize the VoltageSensor object with the provided parameters
self._name: str = name
self._low: float = low
self._high: float = high
self._event_emitter: EventEmitter = event_emitter
async def __call__(self) -> None:
# Start voltage readings for the sensor
print(f"Starting voltage readings for: {self._name}")
print(f"Low: {self._low:.3g}v | High: {self._high:.3g}v\n-----------\n")
while True:
# Simulate sensor readings
voltage: float = random.uniform(0, 5)
print("\tSensor Reading:", "\033[32m", f"{voltage:.3g}v", "\033[0m")
# Emit events based on voltage readings
if voltage < self._low:
self._event_emitter.emit("LowVoltageEvent", sensor=self._name, voltage=voltage)
elif voltage > self._high:
self._event_emitter.emit("HighVoltageEvent", sensor=self._name, voltage=voltage)
await asyncio.sleep(1)
@EventLinker.on("LowVoltageEvent")
def handle_low_voltage_event(sensor: str, voltage: float):
print(f"🪫 Turning on eco-mode for '{sensor}'. ({voltage:.3g}v)\n")
# Perform action for low voltage...
@EventLinker.on("HighVoltageEvent")
async def handle_high_voltage_event(sensor: str, voltage: float):
print(f"⚡ Starting high-voltage protection for '{sensor}'. ({voltage:.3g}v)\n")
# Perform action for high voltage...
@EventLinker.on("LowVoltageEvent", "HighVoltageEvent")
def handle_voltage_event(sensor: str, voltage: float):
print(f"\033[31m\nSensor '{sensor}' out of range.\033[0m (Voltage: {voltage:.3g})")
# Perform notification for out of range voltage...
async def main():
# Initialize the sensor and run the sensor readings
sensor = VoltageSensor(name="PressureSensor", low=0.5, high=3.9, event_emitter=AsyncIOEventEmitter())
await asyncio.gather(sensor(), ) # Add new sensors inside the 'gather' for multi-device monitoring
asyncio.run(main())
As we can see from this practical example, Pyventus enables us to easily build an event-driven system for voltage sensors that is flexible, efficient, and highly responsive. With its intuitive API and support for both synchronous and asynchronous operations, we were able to effectively monitor voltage levels, detect anomalies, and trigger appropriate actions in real-time.
Pyventus is designed from the ground up to seamlessly support both synchronous and asynchronous
programming models. Its unified sync/async API allows you to define event callbacks and emit events across
sync
and async
contexts.
@EventLinker.on("MyEvent")
def sync_event_callback():
pass # Synchronous event handling
@EventLinker.on("MyEvent")
async def async_event_callback():
pass # Asynchronous event handling
You can optimize the execution of your callbacks based on their workload...
By default, event handlers in Pyventus are executed concurrently during an event emission, running their
sync
and async
callbacks as defined. However, if you have a sync
callback
that involves I/O or non-CPU bound operations, you can enable the force_async
parameter to offload it
to a thread pool, ensuring optimal performance and responsiveness. The force_async
parameter utilizes
the asyncio.to_thread()
function to execute sync
callbacks asynchronously.
@EventLinker.on("BlockingIO", force_async=True)
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")
# Emitting an event within a sync function
def sync_function(event_emitter: EventEmitter):
event_emitter.emit("MyEvent")
# Emitting an event within an async function
async def async_function(event_emitter: EventEmitter):
event_emitter.emit("MyEvent")
Event propagation within different contexts...
While Pyventus provides a base EventEmitter
class with a unified sync/async API, the
specific propagation behavior when emitting events may vary depending on the concrete EventEmitter
used. For example, the AsyncIOEventEmitter
implementation leverages the AsyncIO
event
loop to schedule callbacks added from asynchronous contexts without blocking. But alternative emitters could
structure propagation differently to suit their needs.
At its core, Pyventus utilizes a modular event emitter design that allows you to switch seamlessly between different built-in or custom event emitter implementations on the fly. Whether you opt for official emitters or decide to create your custom ones, Pyventus allows you to tailor the behavior and capabilities of the event emitters to perfectly align with your unique requirements.
By leveraging the principle of dependency inversion and using the base EventEmitter
as a
dependency, you can change the concrete implementation on the fly. Let's demonstrate this using the AsyncIO
Event Emitter and the Executor Event Emitter:
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter, ExecutorEventEmitter
@EventLinker.on("GreetEvent")
def handle_greet_event(name: str = "World"):
print(f"Hello, {name}!")
if __name__ == "__main__":
def main(event_emitter: EventEmitter) -> None:
event_emitter.emit("GreetEvent", name=type(event_emitter).__name__)
main(event_emitter=AsyncIOEventEmitter())
with ExecutorEventEmitter() as executor_event_emitter:
main(event_emitter=executor_event_emitter)
To illustrate Pyventus' customization capabilities, we will define and implement a custom event emitter class for the FastAPI framework. This class will efficiently handle the execution of event emissions through its background tasks workflow.
from fastapi import BackgroundTasks
from pyventus import EventEmitter, EventLinker
class FastAPIEventEmitter(EventEmitter):
"""A custom event emitter that uses the FastAPI background tasks."""
def __init__(self, background_tasks: BackgroundTasks):
super().__init__(event_linker=EventLinker, debug=False)
self._background_tasks = background_tasks
def _process(self, event_emission: EventEmitter.EventEmission) -> None:
self._background_tasks.add_task(event_emission) # Process the event emission as a background task
Official FastAPIEventEmitter
Integration.
In case you're interested in integrating Pyventus with FastAPI, you can refer to the official Pyventus FastAPI Event Emitter implementation.
In addition to string events, Pyventus also supports Event Objects, which provide a structured way to define events and encapsulate relevant data payloads.
@dataclass # Define a Python dataclass representing the event and its payload.
class OrderCreatedEvent:
order_id: int
payload: dict[str, any]
@EventLinker.on(OrderCreatedEvent) # Subscribe event handlers to the event.
def handle_order_created_event(event: OrderCreatedEvent):
# Pyventus will automatically pass the Event Object
# as the first positional argument.
print(f"Event Object: {event}")
event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit(
event=OrderCreatedEvent( # Emit an instance of the event!
order_id=6452879,
payload={},
),
)
Furthermore, Pyventus provides support for Global Events, which are particularly useful for
implementing cross-cutting concerns such as logging, monitoring, or analytics. By subscribing event handlers to
...
or Ellipsis
, you can capture all events that may occur within that
EventLinker
context.
@EventLinker.on(...)
def handle_any_event(*args, **kwargs):
print(f"Perform logging...\nArgs: {args}\tKwargs: {kwargs}")
event_emitter: EventEmitter = AsyncIOEventEmitter()
event_emitter.emit("GreetEvent", name="Pyventus")
With Pyventus, you can customize how events are handled upon completion, whether they succeed or
encounter errors. This customization is achieved by using either the EventLinker's on()
or
once()
decorator within a with
statement block. Inside this block, you can
define not only the event callbacks but also the overall workflow of the event. Now, let’s explore
this simple yet powerful Pythonic syntax of Pyventus through an example.
from pyventus import EventLinker, EventEmitter, AsyncIOEventEmitter
# Create an event linker for the "DivisionEvent"
with EventLinker.on("DivisionEvent") as linker:
@linker.on_event
def divide(a: float, b: float) -> float:
return a / b
@linker.on_success
def handle_success(result: float) -> None:
print(f"Division result: {result:.3g}")
@linker.on_failure
def handle_failure(e: Exception) -> None:
print(f"Oops, something went wrong: {e}")
event_emitter: EventEmitter = AsyncIOEventEmitter() # Create an event emitter
event_emitter.emit("DivisionEvent", a=1, b=0) # Example: Division by zero
event_emitter.emit("DivisionEvent", a=1, b=2) # Example: Valid division
As we have seen from the example, Pyventus offers a reliable and Pythonic solution for customizing
event handling. By utilizing the EventLinker and its decorators within a with
statement block, we
were able to define the DivisionEvent
and specify the callbacks for division, success, and failure
cases.
Pyventus continuously adapts to support developers across technological and programming domains. Its aim is to remain at the forefront of event-driven design. Future development may introduce new official event emitters, expanding compatibility with different technologies through seamless integration.
Current default emitters provide reliable out-of-the-box capabilities for common use cases. They efficiently handle core event operations and lay the foundation for building event-driven applications.
Driving Innovation Through Collaboration
Pyventus is an open source project that welcomes community involvement. If you wish to contribute additional event emitters, improvements, or bug fixes, please check the Contributing section for guidelines on collaborating. Together, we can further the possibilities of event-driven development.
Pyventus is distributed as open source software and is released under the MIT License.
You can view the full text of the license in the LICENSE
file located in the Pyventus repository.