-
Notifications
You must be signed in to change notification settings - Fork 161
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
585 complete kafka part of faststream docs (#775)
* Init asyncapi customization page * Add tests and code snippets * Add image examples in the customization docs * Remove empty test_ file * Add details to batch publishing * Add details to Kafka guides * Finish kafka publishing with a key guide * Finish batch publishing guide * Finish general kafka publisher documentation * Reference publisher examples in tests * Fix referencing wrong test in publish_raw * Fix wrong import --------- Co-authored-by: Davor Runje <[email protected]>
- Loading branch information
1 parent
ec0bc25
commit b5df079
Showing
14 changed files
with
482 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,126 @@ | ||
# Publishing | ||
|
||
**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes. | ||
The **FastStream** KafkaBroker supports all regular [publishing use cases](../getting-started/publishing/index.md){.internal-link}, and you can use them without any changes. | ||
|
||
In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key. | ||
However, if you wish to further customize the publishing logic, you should take a closer look at specific KafkaBroker parameters. | ||
|
||
## Basic Kafka Publishing | ||
|
||
The `KafkaBroker` uses the unified `publish` method (from a `producer` object) to send messages. | ||
|
||
In this case, you can use Python primitives and `pydantic.BaseModel` to define the content of the message you want to publish to the Kafka broker. | ||
|
||
You can specify the topic to send by its name. | ||
|
||
1. Create your KafkaBroker instance | ||
|
||
```python linenums="1" | ||
broker = KafkaBroker("localhost:9092") | ||
``` | ||
|
||
2. Publish a message using the `publish` method | ||
|
||
```python linenums="1" | ||
msg = Data(data=0.5) | ||
|
||
await broker.publish( | ||
model_to_json(msg), | ||
"input_data", | ||
headers={"content-type": "application/json"}, | ||
) | ||
``` | ||
|
||
This is the most basic way of using the KafkaBroker to publish a message. | ||
|
||
## Creating a publisher object | ||
|
||
The simplest way to use a KafkaBroker for publishing has a significant limitation: your publishers won't be documented in the AsyncAPI documentation. This might be acceptable for sending occasional one-off messages. However, if you're building a comprehensive service, it's recommended to create publisher objects. These objects can then be parsed and documented in your service's AsyncAPI documentation. Let's go ahead and create those publisher objects! | ||
|
||
1. Create your KafkaBroker instance | ||
|
||
```python linenums="1" | ||
broker = KafkaBroker("localhost:9092") | ||
``` | ||
|
||
2. Create a publisher instance | ||
|
||
```python linenums="1" | ||
prepared_publisher = broker.publisher("input_data") | ||
``` | ||
|
||
2. Publish a message using the `publish` method of the prepared publisher | ||
|
||
```python linenums="1" | ||
msg = Data(data=0.5) | ||
|
||
await prepared_publisher.publish( | ||
model_to_json(msg), | ||
headers={"content-type": "application/json"}, | ||
) | ||
``` | ||
|
||
Now, when you wrap your broker into a FastStream object, the publisher will be exported to the AsyncAPI documentation. | ||
|
||
## Decorating your publishing functions | ||
|
||
To publish messages effectively in the Kafka context, consider utilizing the Publisher Decorator. This approach offers an AsyncAPI representation and is ideal for rapidly developing applications. | ||
|
||
The Publisher Decorator creates a structured DataPipeline unit with both input and output components. The sequence in which you apply Subscriber and Publisher decorators does not affect their functionality. However, note that these decorators can only be applied to functions decorated by a Subscriber as well. | ||
|
||
This method relies on the return type annotation of the handler function to properly interpret the function's return value before sending it. Hence, it's important to ensure accuracy in defining the return type. | ||
|
||
Let's start by examining the entire application that utilizes the Publisher Decorator and then proceed to walk through it step by step. | ||
|
||
```python linenums="1" | ||
from pydantic import BaseModel, Field, NonNegativeFloat | ||
|
||
from faststream import FastStream | ||
from faststream.kafka import KafkaBroker | ||
|
||
|
||
class Data(BaseModel): | ||
data: NonNegativeFloat = Field( | ||
..., examples=[0.5], description="Float data example" | ||
) | ||
|
||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
to_output_data = broker.publisher("output_data") | ||
|
||
|
||
@to_output_data | ||
@broker.subscriber("input_data") | ||
async def on_input_data(msg: Data) -> Data: | ||
return Data(data=msg.data + 1.0) | ||
``` | ||
|
||
1. **Initialize the KafkaBroker instance:** Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address. | ||
|
||
```python linenums="1" | ||
broker = KafkaBroker("localhost:9092") | ||
``` | ||
|
||
2. **Prepare your publisher object to use later as a decorator:** | ||
|
||
```python linenums="1" | ||
to_output_data = broker.publisher("output_data") | ||
``` | ||
|
||
3. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic | ||
|
||
```python linenums="1" | ||
async def on_input_data(msg: Data) -> Data: | ||
return Data(data=msg.data + 1.0) | ||
``` | ||
|
||
4. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. | ||
|
||
```python linenums="1" | ||
@to_output_data | ||
@broker.subscriber("input_data") | ||
async def on_input_data(msg: Data) -> Data: | ||
return Data(data=msg.data + 1.0) | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,58 @@ | ||
# Batch Subscriber | ||
|
||
If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. | ||
If you want to consume data in batches, the `@broker.subscriber(...)` decorator makes it possible. By defining your consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let's walk through how to achieve this. | ||
|
||
## Subscriber Function with Batching | ||
## Using the Subscriber with Batching | ||
|
||
To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. | ||
To consume messages in batches, follow these steps: | ||
|
||
Here is an example of consuming in batches from the **test_batch** topic: | ||
### Step 1: Define Your Subscriber | ||
|
||
In your FastStream application, define the subscriber using the `@broker.subscriber(...)` decorator. Ensure that you configure the `msg` object as a list and set the `batch` parameter to `True`. This configuration tells the subscriber to handle message consumption in batches. | ||
|
||
```python linenums="1" | ||
@broker.subscriber("test_batch", batch=True) | ||
``` | ||
|
||
### Step 2: Implement Your Consuming Function | ||
|
||
Create a consuming function that accepts the list of messages. The `@broker.subscriber(...)` decorator will take care of collecting and grouping messages into batches based on the partition. | ||
|
||
```python linenums="1" | ||
@broker.subscriber("test_batch", batch=True) | ||
async def handle_batch(msg: List[HelloWorld], logger: Logger): | ||
logger.info(msg) | ||
``` | ||
|
||
## Example of Consuming in Batches | ||
|
||
Let's illustrate how to consume messages in batches from the **test_batch** topic with a practical example: | ||
|
||
```python linenums="1" | ||
from typing import List | ||
|
||
from pydantic import BaseModel, Field | ||
|
||
from faststream import FastStream, Logger | ||
from faststream.kafka import KafkaBroker | ||
|
||
broker = KafkaBroker("localhost:9092") | ||
app = FastStream(broker) | ||
|
||
|
||
class HelloWorld(BaseModel): | ||
msg: str = Field( | ||
..., | ||
examples=["Hello"], | ||
description="Demo hello world message", | ||
) | ||
|
||
|
||
@broker.subscriber("test_batch", batch=True) | ||
async def handle_batch(msg: List[HelloWorld], logger: Logger): | ||
logger.info(msg) | ||
``` | ||
|
||
In this example, the subscriber is configured to process messages in batches, and the consuming function is designed to handle these batches efficiently. | ||
|
||
Consuming messages in batches is a valuable technique when you need to optimize the processing of high volumes of data in your Kafka-based applications. It allows for more efficient resource utilization and can enhance the overall performance of your data pipelines. |
Oops, something went wrong.