From b5df079710f3fdfa76859c744ebf9e918d78acfe Mon Sep 17 00:00:00 2001 From: Tvrtko Sternak <117077296+Sternakt@users.noreply.github.com> Date: Fri, 29 Sep 2023 14:55:47 +0200 Subject: [PATCH] 585 complete kafka part of faststream docs (#775) * Init asyncapi customization page * Add tests and code snippets * Add image examples in the customization docs * Remove empty test_ file * Add details to batch publishing * Add details to Kafka guides * Finish kafka publishing with a key guide * Finish batch publishing guide * Finish general kafka publisher documentation * Reference publisher examples in tests * Fix referencing wrong test in publish_raw * Fix wrong import --------- Co-authored-by: Davor Runje --- .../kafka/Publisher/batch_publisher.md | 71 ++++++++-- .faststream_gen/kafka/Publisher/index.md | 125 +++++++++++++++++- .../kafka/Publisher/using_a_key.md | 36 +++-- .../kafka/Subscriber/batch_subscriber.md | 51 ++++++- .../en/kafka/Publisher/batch_publisher.md | 59 +++++++-- docs/docs/en/kafka/Publisher/index.md | 88 +++++++++++- docs/docs/en/kafka/Publisher/using_a_key.md | 36 +++-- .../en/kafka/Subscriber/batch_subscriber.md | 30 ++++- .../kafka/publisher_object/example.py | 33 +++++ .../raw_publish/{test_me.py => example.py} | 0 tests/docs/kafka/publisher_object/__init__.py | 0 .../publisher_object/test_publisher_object.py | 3 + tests/docs/kafka/raw_publish/__init__.py | 0 .../kafka/raw_publish/test_raw_publish.py | 3 + 14 files changed, 482 insertions(+), 53 deletions(-) create mode 100644 docs/docs_src/kafka/publisher_object/example.py rename docs/docs_src/kafka/raw_publish/{test_me.py => example.py} (100%) create mode 100644 tests/docs/kafka/publisher_object/__init__.py create mode 100644 tests/docs/kafka/publisher_object/test_publisher_object.py create mode 100644 tests/docs/kafka/raw_publish/__init__.py create mode 100644 tests/docs/kafka/raw_publish/test_raw_publish.py diff --git a/.faststream_gen/kafka/Publisher/batch_publisher.md b/.faststream_gen/kafka/Publisher/batch_publisher.md index 5ef661abd5..5697c91003 100644 --- a/.faststream_gen/kafka/Publisher/batch_publisher.md +++ b/.faststream_gen/kafka/Publisher/batch_publisher.md @@ -1,19 +1,20 @@ # Publishing in Batches -If you want to send your data in batches, `#!python @broker.publisher(...)` decorator makes that possible for you. -To produce in batches, you need to do two things: +## General overview -1. When creating your publisher, set the `batch` argument to `True`. -2. Return a tuple of the messages you wish to send in a batch. This action will prompt the producer to collect the messages and send them in a batch to a Kafka broker. +If you need to send your data in batches, the @broker.publisher(...) decorator offers a convenient way to achieve this. To enable batch production, you need to perform two crucial steps: -Here is an example of an app producing in batches to **output_data** topic when consuming from **input_data_1**. +Step 1: When creating your publisher, set the batch argument to True. This configuration tells the publisher that you intend to send messages in batches. -In the highligted lines, we can see the steps of creating and using a batch publisher: +Step 2: In your producer function, return a tuple containing the messages you want to send as a batch. This action triggers the producer to gather the messages and transmit them as a batch to a Kafka broker. -1. Creation of the publisher. -2. Publishing an actual batch of messages. +Let's delve into a detailed example illustrating how to produce messages in batches to the output_data topic while consuming from the input_data_1 topic. -```python linenums="1" hl_lines="19 26" +## Code example + +First, lets take a look at the whole app creation and then dive deep into the steps for producing in batches, here is the application code: + +```python linenums="1" from typing import Tuple from pydantic import BaseModel, Field, NonNegativeFloat @@ -35,9 +36,61 @@ app = FastStream(broker) decrease_and_increase = broker.publisher("output_data", batch=True) +@decrease_and_increase +@broker.subscriber("input_data_1") +async def on_input_data_1(msg: Data, logger: Logger) -> Tuple[Data, Data]: + logger.info(msg) + return Data(data=(msg.data * 0.5)), Data(data=(msg.data * 2.0)) + + +@broker.subscriber("input_data_2") +async def on_input_data_2(msg: Data, logger: Logger) -> None: + logger.info(msg) + await decrease_and_increase.publish( + Data(data=(msg.data * 0.5)), Data(data=(msg.data * 2.0)) + ) +``` + +Below, we have highlighted key lines of code that demonstrate the steps involved in creating and using a batch publisher: + +Step 1: Creation of the Publisher + +```python linenums="1" +decrease_and_increase = broker.publisher("output_data", batch=True) +``` + +Step 2: Publishing an Actual Batch of Messages + +You can publish a batch by directly calling the publisher with a batch of messages you want to publish, like shown here: + +```python linenums="1" + await decrease_and_increase.publish( + Data(data=(msg.data * 0.5)), Data(data=(msg.data * 2.0)) + ) +``` + +Or you can decorate your processing function and return a batch of messages like shown here: + +```python linenums="1" @decrease_and_increase @broker.subscriber("input_data_1") async def on_input_data_1(msg: Data, logger: Logger) -> Tuple[Data, Data]: logger.info(msg) return Data(data=(msg.data * 0.5)), Data(data=(msg.data * 2.0)) ``` + +The application in the example imelements both of these ways, feel free to use whatever option fits your needs better. + +## Why publish in batches? + +In this example, we've explored how to leverage the @broker.publisher decorator to efficiently publish messages in batches using FastStream and Kafka. By following the two key steps outlined in the previous sections, you can significantly enhance the performance and reliability of your Kafka-based applications. + +Publishing messages in batches offers several advantages when working with Kafka: + +1. Improved Throughput: Batch publishing allows you to send multiple messages in a single transmission, reducing the overhead associated with individual message delivery. This leads to improved throughput and lower latency in your Kafka applications. + +2. Reduced Network and Broker Load: Sending messages in batches reduces the number of network calls and broker interactions. This optimization minimizes the load on the Kafka brokers and network resources, making your Kafka cluster more efficient. + +3. Atomicity: Batches ensure that a group of related messages is processed together or not at all. This atomicity can be crucial in scenarios where message processing needs to maintain data consistency and integrity. + +4. Enhanced Scalability: With batch publishing, you can efficiently scale your Kafka applications to handle high message volumes. By sending messages in larger chunks, you can make the most of Kafka's parallelism and partitioning capabilities. diff --git a/.faststream_gen/kafka/Publisher/index.md b/.faststream_gen/kafka/Publisher/index.md index bad5a67139..79473bdb1f 100644 --- a/.faststream_gen/kafka/Publisher/index.md +++ b/.faststream_gen/kafka/Publisher/index.md @@ -1,5 +1,126 @@ # Publishing -**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes. +The **FastStream** KafkaBroker supports all regular [publishing use cases](../getting-started/publishing/index.md){.internal-link}, and you can use them without any changes. -In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key. +However, if you wish to further customize the publishing logic, you should take a closer look at specific KafkaBroker parameters. + +## Basic Kafka Publishing + +The `KafkaBroker` uses the unified `publish` method (from a `producer` object) to send messages. + +In this case, you can use Python primitives and `pydantic.BaseModel` to define the content of the message you want to publish to the Kafka broker. + +You can specify the topic to send by its name. + +1. Create your KafkaBroker instance + +```python linenums="1" +broker = KafkaBroker("localhost:9092") +``` + +2. Publish a message using the `publish` method + +```python linenums="1" + msg = Data(data=0.5) + + await broker.publish( + model_to_json(msg), + "input_data", + headers={"content-type": "application/json"}, + ) +``` + +This is the most basic way of using the KafkaBroker to publish a message. + +## Creating a publisher object + +The simplest way to use a KafkaBroker for publishing has a significant limitation: your publishers won't be documented in the AsyncAPI documentation. This might be acceptable for sending occasional one-off messages. However, if you're building a comprehensive service, it's recommended to create publisher objects. These objects can then be parsed and documented in your service's AsyncAPI documentation. Let's go ahead and create those publisher objects! + +1. Create your KafkaBroker instance + +```python linenums="1" +broker = KafkaBroker("localhost:9092") +``` + +2. Create a publisher instance + +```python linenums="1" +prepared_publisher = broker.publisher("input_data") +``` + +2. Publish a message using the `publish` method of the prepared publisher + +```python linenums="1" + msg = Data(data=0.5) + + await prepared_publisher.publish( + model_to_json(msg), + headers={"content-type": "application/json"}, + ) +``` + +Now, when you wrap your broker into a FastStream object, the publisher will be exported to the AsyncAPI documentation. + +## Decorating your publishing functions + +To publish messages effectively in the Kafka context, consider utilizing the Publisher Decorator. This approach offers an AsyncAPI representation and is ideal for rapidly developing applications. + +The Publisher Decorator creates a structured DataPipeline unit with both input and output components. The sequence in which you apply Subscriber and Publisher decorators does not affect their functionality. However, note that these decorators can only be applied to functions decorated by a Subscriber as well. + +This method relies on the return type annotation of the handler function to properly interpret the function's return value before sending it. Hence, it's important to ensure accuracy in defining the return type. + +Let's start by examining the entire application that utilizes the Publisher Decorator and then proceed to walk through it step by step. + +```python linenums="1" +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import FastStream +from faststream.kafka import KafkaBroker + + +class Data(BaseModel): + data: NonNegativeFloat = Field( + ..., examples=[0.5], description="Float data example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@to_output_data +@broker.subscriber("input_data") +async def on_input_data(msg: Data) -> Data: + return Data(data=msg.data + 1.0) +``` + +1. **Initialize the KafkaBroker instance:** Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address. + +```python linenums="1" +broker = KafkaBroker("localhost:9092") +``` + +2. **Prepare your publisher object to use later as a decorator:** + +```python linenums="1" +to_output_data = broker.publisher("output_data") +``` + +3. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic + +```python linenums="1" +async def on_input_data(msg: Data) -> Data: + return Data(data=msg.data + 1.0) +``` + +4. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. + +```python linenums="1" +@to_output_data +@broker.subscriber("input_data") +async def on_input_data(msg: Data) -> Data: + return Data(data=msg.data + 1.0) +``` diff --git a/.faststream_gen/kafka/Publisher/using_a_key.md b/.faststream_gen/kafka/Publisher/using_a_key.md index 65ed285c19..73faef6207 100644 --- a/.faststream_gen/kafka/Publisher/using_a_key.md +++ b/.faststream_gen/kafka/Publisher/using_a_key.md @@ -1,24 +1,34 @@ -# Defining a Partition Key +# Using a Partition Key -Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance. +Partition keys are a crucial concept in Apache Kafka, enabling you to determine the appropriate partition for a message. This ensures that related messages are kept together in the same partition, which can be invaluable for maintaining order or grouping related messages for efficient processing. Additionally, Kafka utilizes partitioning to distribute load across multiple brokers and scale horizontally, while replicating data across brokers provides fault tolerance. -You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)` decorator. This guide will demonstrate this feature to you. +You can specify your partition keys when utilizing the `@KafkaBroker.publisher(...)` decorator in FastStream. This guide will walk you through the process of using partition keys effectively. -## Calling `publish` with a Key +## Publishing with a Partition Key -To publish a message to a Kafka topic using a key, simply pass the `key` parameter to the `publish` function call, like this: +To publish a message to a Kafka topic using a partition key, follow these steps: -```python - await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key") +### Step 1: Define the Publisher + +In your FastStream application, define the publisher using the `@KafkaBroker.publisher(...)` decorator. This decorator allows you to configure various aspects of message publishing, including the partition key. + +```python linenums="1" +to_output_data = broker.publisher("output_data") ``` -## App Example +### Step 2: Pass the Key + +When you're ready to publish a message with a specific key, simply include the `key` parameter in the `publish` function call. This key parameter is used to determine the appropriate partition for the message. -Let's take a look at the whole app example that will consume from the **input_data** topic and publish with a key to the **output_data** topic. +```python linenums="1" + await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key") +``` + +## Example Application -You can see that the only difference from normal publishing is that now we pass the key to the publisher call. +Let's examine a complete application example that consumes messages from the **input_data** topic and publishes them with a specified key to the **output_data** topic. This example will illustrate how to incorporate partition keys into your Kafka-based applications: -```python linenums="1" hl_lines="25" +```python linenums="1" from pydantic import BaseModel, Field, NonNegativeFloat from faststream import Context, FastStream, Logger @@ -45,3 +55,7 @@ async def on_input_data( logger.info(f"on_input_data({msg=})") await to_output_data.publish(Data(data=msg.data + 1.0), key=b"key") ``` + +As you can see, the primary difference from standard publishing is the inclusion of the `key` parameter in the `publish` call. This key parameter is essential for controlling how Kafka partitions and processes your messages. + +In summary, using partition keys in Apache Kafka is a fundamental practice for optimizing message distribution, maintaining order, and achieving efficient processing. It is a key technique for ensuring that your Kafka-based applications scale gracefully and handle large volumes of data effectively. diff --git a/.faststream_gen/kafka/Subscriber/batch_subscriber.md b/.faststream_gen/kafka/Subscriber/batch_subscriber.md index e03a1a7947..3a207fd89b 100644 --- a/.faststream_gen/kafka/Subscriber/batch_subscriber.md +++ b/.faststream_gen/kafka/Subscriber/batch_subscriber.md @@ -1,15 +1,58 @@ # Batch Subscriber -If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. +If you want to consume data in batches, the `@broker.subscriber(...)` decorator makes it possible. By defining your consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let's walk through how to achieve this. -## Subscriber Function with Batching +## Using the Subscriber with Batching -To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. +To consume messages in batches, follow these steps: -Here is an example of consuming in batches from the **test_batch** topic: +### Step 1: Define Your Subscriber + +In your FastStream application, define the subscriber using the `@broker.subscriber(...)` decorator. Ensure that you configure the `msg` object as a list and set the `batch` parameter to `True`. This configuration tells the subscriber to handle message consumption in batches. + +```python linenums="1" +@broker.subscriber("test_batch", batch=True) +``` + +### Step 2: Implement Your Consuming Function + +Create a consuming function that accepts the list of messages. The `@broker.subscriber(...)` decorator will take care of collecting and grouping messages into batches based on the partition. + +```python linenums="1" +@broker.subscriber("test_batch", batch=True) +async def handle_batch(msg: List[HelloWorld], logger: Logger): + logger.info(msg) +``` + +## Example of Consuming in Batches + +Let's illustrate how to consume messages in batches from the **test_batch** topic with a practical example: ```python linenums="1" +from typing import List + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +class HelloWorld(BaseModel): + msg: str = Field( + ..., + examples=["Hello"], + description="Demo hello world message", + ) + + @broker.subscriber("test_batch", batch=True) async def handle_batch(msg: List[HelloWorld], logger: Logger): logger.info(msg) ``` + +In this example, the subscriber is configured to process messages in batches, and the consuming function is designed to handle these batches efficiently. + +Consuming messages in batches is a valuable technique when you need to optimize the processing of high volumes of data in your Kafka-based applications. It allows for more efficient resource utilization and can enhance the overall performance of your data pipelines. diff --git a/docs/docs/en/kafka/Publisher/batch_publisher.md b/docs/docs/en/kafka/Publisher/batch_publisher.md index 329c4ab4b4..074fbdd109 100644 --- a/docs/docs/en/kafka/Publisher/batch_publisher.md +++ b/docs/docs/en/kafka/Publisher/batch_publisher.md @@ -1,18 +1,57 @@ # Publishing in Batches -If you want to send your data in batches, `#!python @broker.publisher(...)` decorator makes that possible for you. -To produce in batches, you need to do two things: +## General overview -1. When creating your publisher, set the `batch` argument to `True`. -2. Return a tuple of the messages you wish to send in a batch. This action will prompt the producer to collect the messages and send them in a batch to a Kafka broker. +If you need to send your data in batches, the @broker.publisher(...) decorator offers a convenient way to achieve this. To enable batch production, you need to perform two crucial steps: -Here is an example of an app producing in batches to **output_data** topic when consuming from **input_data_1**. +Step 1: When creating your publisher, set the batch argument to True. This configuration tells the publisher that you intend to send messages in batches. -In the highligted lines, we can see the steps of creating and using a batch publisher: +Step 2: In your producer function, return a tuple containing the messages you want to send as a batch. This action triggers the producer to gather the messages and transmit them as a batch to a Kafka broker. -1. Creation of the publisher. -2. Publishing an actual batch of messages. +Let's delve into a detailed example illustrating how to produce messages in batches to the output_data topic while consuming from the input_data_1 topic. -```python linenums="1" hl_lines="19 26" -{!> docs_src/kafka/publish_batch/app.py [ln:1-26] !} +## Code example + +First, lets take a look at the whole app creation and then dive deep into the steps for producing in batches, here is the application code: + +```python linenums="1" +{!> docs_src/kafka/publish_batch/app.py!} +``` + +Below, we have highlighted key lines of code that demonstrate the steps involved in creating and using a batch publisher: + +Step 1: Creation of the Publisher + +```python linenums="1" +{!> docs_src/kafka/publish_batch/app.py [ln:19] !} ``` + +Step 2: Publishing an Actual Batch of Messages + +You can publish a batch by directly calling the publisher with a batch of messages you want to publish, like shown here: + +```python linenums="1" +{!> docs_src/kafka/publish_batch/app.py [ln:32-34] !} +``` + +Or you can decorate your processing function and return a batch of messages like shown here: + +```python linenums="1" +{!> docs_src/kafka/publish_batch/app.py [ln:22-26] !} +``` + +The application in the example imelements both of these ways, feel free to use whatever option fits your needs better. + +## Why publish in batches? + +In this example, we've explored how to leverage the @broker.publisher decorator to efficiently publish messages in batches using FastStream and Kafka. By following the two key steps outlined in the previous sections, you can significantly enhance the performance and reliability of your Kafka-based applications. + +Publishing messages in batches offers several advantages when working with Kafka: + +1. Improved Throughput: Batch publishing allows you to send multiple messages in a single transmission, reducing the overhead associated with individual message delivery. This leads to improved throughput and lower latency in your Kafka applications. + +2. Reduced Network and Broker Load: Sending messages in batches reduces the number of network calls and broker interactions. This optimization minimizes the load on the Kafka brokers and network resources, making your Kafka cluster more efficient. + +3. Atomicity: Batches ensure that a group of related messages is processed together or not at all. This atomicity can be crucial in scenarios where message processing needs to maintain data consistency and integrity. + +4. Enhanced Scalability: With batch publishing, you can efficiently scale your Kafka applications to handle high message volumes. By sending messages in larger chunks, you can make the most of Kafka's parallelism and partitioning capabilities. diff --git a/docs/docs/en/kafka/Publisher/index.md b/docs/docs/en/kafka/Publisher/index.md index bad5a67139..b053b9bbca 100644 --- a/docs/docs/en/kafka/Publisher/index.md +++ b/docs/docs/en/kafka/Publisher/index.md @@ -1,5 +1,89 @@ # Publishing -**FastStream** KafkaBroker supports all regular [publishing usecases](../../getting-started/publishing/index.md){.internal-link}, which you can use without any changes. +The **FastStream** KafkaBroker supports all regular [publishing use cases](../getting-started/publishing/index.md){.internal-link}, and you can use them without any changes. -In the following chapters, we will demonstrate how to use a KafkaBroker publisher in specific use cases, such as publishing batches or publishing with a key. +However, if you wish to further customize the publishing logic, you should take a closer look at specific KafkaBroker parameters. + +## Basic Kafka Publishing + +The `KafkaBroker` uses the unified `publish` method (from a `producer` object) to send messages. + +In this case, you can use Python primitives and `pydantic.BaseModel` to define the content of the message you want to publish to the Kafka broker. + +You can specify the topic to send by its name. + +1. Create your KafkaBroker instance + +```python linenums="1" +{!> docs_src/kafka/raw_publish/example.py [ln:8] !} +``` + +2. Publish a message using the `publish` method + +```python linenums="1" +{!> docs_src/kafka/raw_publish/example.py [ln:26-32] !} +``` + +This is the most basic way of using the KafkaBroker to publish a message. + +## Creating a publisher object + +The simplest way to use a KafkaBroker for publishing has a significant limitation: your publishers won't be documented in the AsyncAPI documentation. This might be acceptable for sending occasional one-off messages. However, if you're building a comprehensive service, it's recommended to create publisher objects. These objects can then be parsed and documented in your service's AsyncAPI documentation. Let's go ahead and create those publisher objects! + +1. Create your KafkaBroker instance + +```python linenums="1" +{!> docs_src/kafka/publisher_object/example.py [ln:8] !} +``` + +2. Create a publisher instance + +```python linenums="1" +{!> docs_src/kafka/publisher_object/example.py [ln:17] !} +``` + +2. Publish a message using the `publish` method of the prepared publisher + +```python linenums="1" +{!> docs_src/kafka/publisher_object/example.py [ln:26-31] !} +``` + +Now, when you wrap your broker into a FastStream object, the publisher will be exported to the AsyncAPI documentation. + +## Decorating your publishing functions + +To publish messages effectively in the Kafka context, consider utilizing the Publisher Decorator. This approach offers an AsyncAPI representation and is ideal for rapidly developing applications. + +The Publisher Decorator creates a structured DataPipeline unit with both input and output components. The sequence in which you apply Subscriber and Publisher decorators does not affect their functionality. However, note that these decorators can only be applied to functions decorated by a Subscriber as well. + +This method relies on the return type annotation of the handler function to properly interpret the function's return value before sending it. Hence, it's important to ensure accuracy in defining the return type. + +Let's start by examining the entire application that utilizes the Publisher Decorator and then proceed to walk through it step by step. + +```python linenums="1" +{!> docs_src/kafka/publish_example/app.py [ln:1-26] !} +``` + +1. **Initialize the KafkaBroker instance:** Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address. + +```python linenums="1" +{!> docs_src/kafka/publish_example/app.py [ln:13] !} +``` + +2. **Prepare your publisher object to use later as a decorator:** + +```python linenums="1" +{!> docs_src/kafka/publish_example/app.py [ln:17] !} +``` + +3. **Create your processing logic:** Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic + +```python linenums="1" +{!> docs_src/kafka/publish_example/app.py [ln:22-23] !} +``` + +4. **Decorate your processing function:** To connect your processing function to the desired Kafka topics you need to decorate it with `#!python @broker.subscriber` and `#!python @broker.publisher` decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator. + +```python linenums="1" +{!> docs_src/kafka/publish_example/app.py [ln:20-23] !} +``` diff --git a/docs/docs/en/kafka/Publisher/using_a_key.md b/docs/docs/en/kafka/Publisher/using_a_key.md index 7665be371c..b2d5cf16c3 100644 --- a/docs/docs/en/kafka/Publisher/using_a_key.md +++ b/docs/docs/en/kafka/Publisher/using_a_key.md @@ -1,23 +1,37 @@ -# Defining a Partition Key +# Using a Partition Key -Partition keys are used in Apache Kafka to determine which partition a message should be written to. This ensures that related messages are kept together in the same partition, which can be useful for ensuring order or for grouping related messages together for efficient processing. Additionally, partitioning data across multiple partitions allows Kafka to distribute load across multiple brokers and scale horizontally, while replicating data across multiple brokers provides fault tolerance. +Partition keys are a crucial concept in Apache Kafka, enabling you to determine the appropriate partition for a message. This ensures that related messages are kept together in the same partition, which can be invaluable for maintaining order or grouping related messages for efficient processing. Additionally, Kafka utilizes partitioning to distribute load across multiple brokers and scale horizontally, while replicating data across brokers provides fault tolerance. -You can define your partition keys when using the `#!python @KafkaBroker.publisher(...)` decorator. This guide will demonstrate this feature to you. +You can specify your partition keys when utilizing the `@KafkaBroker.publisher(...)` decorator in FastStream. This guide will walk you through the process of using partition keys effectively. -## Calling `publish` with a Key +## Publishing with a Partition Key -To publish a message to a Kafka topic using a key, simply pass the `key` parameter to the `publish` function call, like this: +To publish a message to a Kafka topic using a partition key, follow these steps: -```python -{!> docs_src/kafka/publish_with_partition_key/app.py [ln:25] !} +### Step 1: Define the Publisher + +In your FastStream application, define the publisher using the `@KafkaBroker.publisher(...)` decorator. This decorator allows you to configure various aspects of message publishing, including the partition key. + +```python linenums="1" +{!> docs_src/kafka/publish_with_partition_key/app.py [ln:17] !} ``` -## App Example +### Step 2: Pass the Key -Let's take a look at the whole app example that will consume from the **input_data** topic and publish with a key to the **output_data** topic. +When you're ready to publish a message with a specific key, simply include the `key` parameter in the `publish` function call. This key parameter is used to determine the appropriate partition for the message. -You can see that the only difference from normal publishing is that now we pass the key to the publisher call. +```python linenums="1" +{!> docs_src/kafka/publish_with_partition_key/app.py [ln:25] !} +``` -```python linenums="1" hl_lines="25" +## Example Application + +Let's examine a complete application example that consumes messages from the **input_data** topic and publishes them with a specified key to the **output_data** topic. This example will illustrate how to incorporate partition keys into your Kafka-based applications: + +```python linenums="1" {!> docs_src/kafka/publish_with_partition_key/app.py [ln:1-25] !} ``` + +As you can see, the primary difference from standard publishing is the inclusion of the `key` parameter in the `publish` call. This key parameter is essential for controlling how Kafka partitions and processes your messages. + +In summary, using partition keys in Apache Kafka is a fundamental practice for optimizing message distribution, maintaining order, and achieving efficient processing. It is a key technique for ensuring that your Kafka-based applications scale gracefully and handle large volumes of data effectively. diff --git a/docs/docs/en/kafka/Subscriber/batch_subscriber.md b/docs/docs/en/kafka/Subscriber/batch_subscriber.md index 67cb11779f..76fe10c5b2 100644 --- a/docs/docs/en/kafka/Subscriber/batch_subscriber.md +++ b/docs/docs/en/kafka/Subscriber/batch_subscriber.md @@ -1,13 +1,35 @@ # Batch Subscriber -If you want to consume data in batches, the `#!python @broker.subscriber(...)` decorator makes that possible for you. By typing a consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let’s demonstrate that now. +If you want to consume data in batches, the `@broker.subscriber(...)` decorator makes it possible. By defining your consumed `msg` object as a list of messages and setting the `batch` parameter to `True`, the subscriber will call your consuming function with a batch of messages consumed from a single partition. Let's walk through how to achieve this. -## Subscriber Function with Batching +## Using the Subscriber with Batching -To consume messages in batches, you need to wrap your message type into a list and and set the `batch` parameter to `True`. The `#!python @broker.subscriber(...)` decorator will take care of the rest for you. Your subscribed function will be called with batches grouped by partition now. +To consume messages in batches, follow these steps: -Here is an example of consuming in batches from the **test_batch** topic: +### Step 1: Define Your Subscriber + +In your FastStream application, define the subscriber using the `@broker.subscriber(...)` decorator. Ensure that you configure the `msg` object as a list and set the `batch` parameter to `True`. This configuration tells the subscriber to handle message consumption in batches. + +```python linenums="1" +{!> docs_src/kafka/batch_consuming_pydantic/app.py [ln:20] !} +``` + +### Step 2: Implement Your Consuming Function + +Create a consuming function that accepts the list of messages. The `@broker.subscriber(...)` decorator will take care of collecting and grouping messages into batches based on the partition. ```python linenums="1" {!> docs_src/kafka/batch_consuming_pydantic/app.py [ln:20-22] !} ``` + +## Example of Consuming in Batches + +Let's illustrate how to consume messages in batches from the **test_batch** topic with a practical example: + +```python linenums="1" +{!> docs_src/kafka/batch_consuming_pydantic/app.py!} +``` + +In this example, the subscriber is configured to process messages in batches, and the consuming function is designed to handle these batches efficiently. + +Consuming messages in batches is a valuable technique when you need to optimize the processing of high volumes of data in your Kafka-based applications. It allows for more efficient resource utilization and can enhance the overall performance of your data pipelines. diff --git a/docs/docs_src/kafka/publisher_object/example.py b/docs/docs_src/kafka/publisher_object/example.py new file mode 100644 index 0000000000..4bb6c5a957 --- /dev/null +++ b/docs/docs_src/kafka/publisher_object/example.py @@ -0,0 +1,33 @@ +import pytest +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import FastStream, Logger +from faststream._compat import model_to_json +from faststream.kafka import KafkaBroker, TestKafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +class Data(BaseModel): + data: NonNegativeFloat = Field( + ..., examples=[0.5], description="Float data example" + ) + +prepared_publisher = broker.publisher("input_data") + +@broker.subscriber("input_data") +async def handle_data(msg: Data, logger: Logger) -> None: + logger.info(f"handle_data({msg=})") + +@pytest.mark.asyncio +async def test_prepared_publish(): + async with TestKafkaBroker(broker): + msg = Data(data=0.5) + + await prepared_publisher.publish( + model_to_json(msg), + headers={"content-type": "application/json"}, + ) + + handle_data.mock.assert_called_once_with(dict(msg)) diff --git a/docs/docs_src/kafka/raw_publish/test_me.py b/docs/docs_src/kafka/raw_publish/example.py similarity index 100% rename from docs/docs_src/kafka/raw_publish/test_me.py rename to docs/docs_src/kafka/raw_publish/example.py diff --git a/tests/docs/kafka/publisher_object/__init__.py b/tests/docs/kafka/publisher_object/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/docs/kafka/publisher_object/test_publisher_object.py b/tests/docs/kafka/publisher_object/test_publisher_object.py new file mode 100644 index 0000000000..8f41ee0f22 --- /dev/null +++ b/tests/docs/kafka/publisher_object/test_publisher_object.py @@ -0,0 +1,3 @@ +from docs.docs_src.kafka.publisher_object.example import test_prepared_publish + +__all__ = ["test_prepared_publish"] diff --git a/tests/docs/kafka/raw_publish/__init__.py b/tests/docs/kafka/raw_publish/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/docs/kafka/raw_publish/test_raw_publish.py b/tests/docs/kafka/raw_publish/test_raw_publish.py new file mode 100644 index 0000000000..5dfe14cba4 --- /dev/null +++ b/tests/docs/kafka/raw_publish/test_raw_publish.py @@ -0,0 +1,3 @@ +from docs.docs_src.kafka.raw_publish.example import test_raw_publish + +__all__ = ["test_raw_publish"]