From 4b010886b6fa704d6b0b4ff57724561aafba5c6f Mon Sep 17 00:00:00 2001 From: Lancetnik Date: Sun, 17 Sep 2023 13:29:51 +0300 Subject: [PATCH 1/4] docs: fix GH README --- README.md | 182 +++++++++++++---------------- docs/docs/en/features.md | 26 ----- docs/docs_src/index/test_kafka.py | 4 +- docs/docs_src/index/test_rabbit.py | 4 +- 4 files changed, 83 insertions(+), 133 deletions(-) delete mode 100644 docs/docs/en/features.md diff --git a/README.md b/README.md index 46f339442f..e96bb73e8a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ [Note]: # (This is an auto-generated file. Please edit docs/docs/en/index.md instead) - # Features of FastStream Effortless event stream integration for your services @@ -56,9 +55,9 @@ Making streaming microservices has never been easier. Designed with junior devel - **Multiple Brokers**: **FastStream** provides a unified API to work across multiple message brokers (**Kafka**, **RabbitMQ** support) -- [**Pydantic Validation**](#writing-app-code): Leverage [**Pydantic's**](https://docs.pydantic.dev/){.external-link target="_blank"} validation capabilities to serialize and validates incoming messages +- [**Pydantic Validation**](#writing-app-code): Leverage [**Pydantic's**](https://docs.pydantic.dev/) validation capabilities to serialize and validates incoming messages -- [**Automatic Docs**](#project-documentation): Stay ahead with automatic [AsyncAPI](https://www.asyncapi.com/){.external-link target="_blank"} documentation. +- [**Automatic Docs**](#project-documentation): Stay ahead with automatic [AsyncAPI](https://www.asyncapi.com/) documentation. - **Intuitive**: full typed editor support makes your development experience smooth, catching errors before they reach runtime @@ -78,7 +77,7 @@ That's **FastStream** in a nutshell—easy, efficient, and powerful. Whether you ## History -**FastStream** is a new package based on the ideas and experiences gained from [FastKafka](https://github.com/airtai/fastkafka){.external-link target="_blank"} and [Propan](https://github.com/lancetnik/propan){.external-link target="_blank"}. By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regradless of the underliying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it. +**FastStream** is a new package based on the ideas and experiences gained from [FastKafka](https://github.com/airtai/fastkafka) and [Propan](https://github.com/lancetnik/propan). By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regradless of the underliying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it. --- @@ -87,25 +86,20 @@ That's **FastStream** in a nutshell—easy, efficient, and powerful. Whether you **FastStream** works on **Linux**, **macOS**, **Windows** and most **Unix**-style operating systems. You can install it with `pip` as usual: -=== "Kafka" - ```sh - pip install faststream[kafka] - ``` - -=== "RabbitMQ" - ```sh - pip install faststream[rabbit] - ``` +```sh +pip install faststream[kafka] +# or +pip install faststream[rabbit] +``` -!!! tip "" - By default **FastStream** uses **PydanticV2** written in **Rust**, but you can downgrade it manually, if your platform has no **Rust** support - **FastStream** will works with the **PydanticV1** correctly as well. +By default **FastStream** uses **PydanticV2** written in **Rust**, but you can downgrade it manually, if your platform has no **Rust** support - **FastStream** will works with the **PydanticV1** correctly as well. --- ## Writing app code -**FastStream** brokers provide convenient function decorators `#!python @broker.subscriber` -and `#!python @broker.publisher` to allow you to delegate the actual process of +**FastStream** brokers provide convenient function decorators `@broker.subscriber` +and `@broker.publisher` to allow you to delegate the actual process of - consuming and producing data to Event queues, and @@ -113,44 +107,30 @@ and `#!python @broker.publisher` to allow you to delegate the actual process of These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration. -Also, **FastStream** uses [**Pydantic**](https://docs.pydantic.dev/){.external-link target="_blank"} to parse input +Also, **FastStream** uses [**Pydantic**](https://docs.pydantic.dev/) to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize you input messages just using type annotations. Here is an example python app using **FastStream** that consumes data from an incoming data stream and outputs the data to another one. -=== "Kafka" - ```python linenums="1" hl_lines="9" +```python from faststream import FastStream from faststream.kafka import KafkaBroker +# from faststream.rabbit import RabbitBroker broker = KafkaBroker("localhost:9092") +# broker = RabbitBroker("amqp://guest:guest@localhost:5672/") app = FastStream(broker) -@broker.subscriber("in-topic") -@broker.publisher("out-topic") -async def handle_msg(user: str, user_id: int) -> str: - return f"User: {user_id} - {user} registered" - ``` - -=== "RabbitMQ" - ```python linenums="1" hl_lines="9" -from faststream import FastStream -from faststream.rabbit import RabbitBroker - -broker = RabbitBroker("amqp://guest:guest@localhost:5672/") -app = FastStream(broker) - -@broker.subscriber("in-queue") -@broker.publisher("out-queue") +@broker.subscriber("in") +@broker.publisher("out") async def handle_msg(user: str, user_id: int) -> str: return f"User: {user_id} - {user} registered" - ``` +``` -Also, **Pydantic**’s [`BaseModel`](https://docs.pydantic.dev/usage/models/){.external-link target="_blank"} class allows you +Also, **Pydantic**’s [`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages. -=== "Kafka" - ```python linenums="1" hl_lines="1 8 14" +```python from pydantic import BaseModel, Field, PositiveInt from faststream import FastStream from faststream.kafka import KafkaBroker @@ -162,30 +142,11 @@ class User(BaseModel): user: str = Field(..., examples=["John"]) user_id: PositiveInt = Field(..., examples=["1"]) -@broker.subscriber("in-topic") -@broker.publisher("out-topic") -async def handle_msg(data: User) -> str: - return f"User: {data.user} - {data.user_id} registered" - ``` - -=== "RabbitMQ" - ```python linenums="1" hl_lines="1 8 14" -from pydantic import BaseModel, Field, PositiveInt -from faststream import FastStream -from faststream.rabbit import RabbitBroker - -broker = RabbitBroker("amqp://guest:guest@localhost:5672/") -app = FastStream(broker) - -class User(BaseModel): - user: str = Field(..., examples=["John"]) - user_id: PositiveInt = Field(..., examples=["1"]) - -@broker.subscriber("in-queue") -@broker.publisher("out-queue") +@broker.subscriber("in") +@broker.publisher("out") async def handle_msg(data: User) -> str: return f"User: {data.user} - {data.user_id} registered" - ``` +``` --- @@ -197,9 +158,8 @@ The Tester will redirect your `subscriber` and `publisher` decorated functions t Using pytest, the test for our service would look like this: -=== "Kafka" - ```python linenums="1" hl_lines="3 10 18-19" - # Code above omitted 👆 +```python +# Code above omitted 👆 import pytest import pydantic @@ -212,38 +172,14 @@ async def test_correct(): await br.publish({ "user": "John", "user_id": 1, - }) + }, "in") @pytest.mark.asyncio async def test_invalid(): async with TestKafkaBroker(broker) as br: with pytest.raises(pydantic.ValidationError): - await br.publish("wrong message") - ``` - -=== "RabbitMQ" - ```python linenums="1" hl_lines="3 10 18-19" - # Code above omitted 👆 - -import pytest -import pydantic -from faststream.rabbit import TestRabbitBroker - - -@pytest.mark.asyncio -async def test_correct(): - async with TestRabbitBroker(broker) as br: - await br.publish({ - "user": "John", - "user_id": 1, - }) - -@pytest.mark.asyncio -async def test_invalid(): - async with TestRabbitBroker(broker) as br: - with pytest.raises(pydantic.ValidationError): - await br.publish("wrong message") - ``` + await br.publish("wrong message", "in") +``` ## Running the application @@ -275,17 +211,17 @@ And multiprocessing horizontal scaling feature as well faststream run basic:app --workers 3 ``` -You can know more about **CLI** features [here](./getting-started/cli/index.md){.internal-link} +You can know more about **CLI** features [here](https://faststream.airt.ai/0.1.0rc0/getting-started/cli/) --- ## Project Documentation -**FastStream** automatically generates documentation for your project according to the [**AsyncAPI**](https://www.asyncapi.com/){.external-link target="_blank"} specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams. +**FastStream** automatically generates documentation for your project according to the [**AsyncAPI**](https://www.asyncapi.com/) specification. You can work with both generated artifacts and place a Web view of your documentation on resources available to related teams. The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message format the application works with. And most importantly, it won't cost anything - **FastStream** has already created the docs for you! -![HTML-page](../assets/img/AsyncAPI-basic-html-short.png) +![HTML-page](https://faststream.airt.ai/0.1.0rc0/assets/img/AsyncAPI-basic-html-short.png) --- @@ -315,22 +251,62 @@ async def base_handler(user: str, You can use **FastStream** `MQBrokers` without `FastStream` application. Just *start* and *stop* them according to your application lifespan. -{! includes/index/integrations.md !} +```python +from aiohttp import web + +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") + +@broker.subscriber("test") +async def base_handler(body): + print(body) + +async def start_broker(app): + await broker.start() + +async def stop_broker(app): + await broker.close() + +async def hello(request): + return web.Response(text="Hello, world") + +app = web.Application() +app.add_routes([web.get("/", hello)]) +app.on_startup.append(start_broker) +app.on_cleanup.append(stop_broker) + +if __name__ == "__main__": + web.run_app(app) +``` ### **FastAPI** Plugin Also, **FastStream** can be used as part of **FastAPI**. -Just import a **StreamRouter** you need and declare message handler with the same `#!python @router.subscriber(...)` and `#!python @router.publisher(...)` decorators. +Just import a **StreamRouter** you need and declare message handler with the same `@router.subscriber(...)` and `@router.publisher(...)` decorators. + +```python +from fastapi import FastAPI +from pydantic import BaseModel + +from faststream.kafka.fastapi import KafkaRouter -!!! tip - When used this way, **FastStream** does not utilize its own dependency and serialization system, but integrates into **FastAPI**. - That is, you can use `Depends`, `BackgroundTasks` and other **FastAPI** tools as if it were a regular HTTP endpoint. +router = KafkaRouter("localhost:9092") -{! includes/getting_started/integrations/fastapi/1.md !} +class Incoming(BaseModel): + m: dict + +@router.subscriber("test") +@router.publisher("response") +async def hello(m: Incoming): + return {"response": "Hello, world!"} + +app = FastAPI(lifespan=router.lifespan_context) +app.include_router(router) +``` -!!! note - More integration features can be found [here](./getting-started/integrations/fastapi/index.md){.internal-link} +More integration features can be found [here](https://faststream.airt.ai/0.1.0rc0/getting-started/integrations/fastapi/) --- diff --git a/docs/docs/en/features.md b/docs/docs/en/features.md deleted file mode 100644 index b38fe76377..0000000000 --- a/docs/docs/en/features.md +++ /dev/null @@ -1,26 +0,0 @@ ---- -hide: - - navigation ---- - -# Features of FastStream - -Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use-cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices. - -- **Multiple Brokers Support**: Switching between Apache Kafka and RabbitMQ? No problem. FastStream provides a unified API to work across multiple message brokers. - -- **Pydantic Validation**: Leverage Pydantic's powerful validation capabilities to ensure your data models are always correct. Both annotation-based and model-based validations are supported. - -- **Automatic Docs**: Stay ahead with automatic [AsyncAPI](https://www.asyncapi.com/){.external-link target="_blank"} documentation. No need to manually keep your API docs up to date. - -- **Editor Support**: Enjoy full typed editor support that makes your development experience smooth, catching errors before they reach runtime. - -- **Powerful Dependency Injection System**: Manage your service dependencies efficiently with FastStream's built-in DI system. It's so easy to use, even for junior developers. - -- **Testable**: Writing unit tests has never been this simple. FastStream's architecture supports in-memory tests, making your CI/CD pipeline faster and more reliable. - -- **Extendable**: Need custom serialization or middlewares? FastStream supports extensions for lifespans, custom serialization methods, and even middlewares. - -- **Built for Automatic Code Generation**: Cut down development time and improve reliability. With its high-level abstractions and well-designed architecture, FastStream is optimized for automatic code generation using advanced models like GPT and Llama. - -That's FastStream in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered. diff --git a/docs/docs_src/index/test_kafka.py b/docs/docs_src/index/test_kafka.py index 7f5eb3ad27..409113c5ed 100644 --- a/docs/docs_src/index/test_kafka.py +++ b/docs/docs_src/index/test_kafka.py @@ -11,10 +11,10 @@ async def test_correct(): await br.publish({ "user": "John", "user_id": 1, - }) + }, "in-topic") @pytest.mark.asyncio async def test_invalid(): async with TestKafkaBroker(broker) as br: with pytest.raises(pydantic.ValidationError): - await br.publish("wrong message") + await br.publish("wrong message", "in-topic") diff --git a/docs/docs_src/index/test_rabbit.py b/docs/docs_src/index/test_rabbit.py index c3df0817a4..f730086349 100644 --- a/docs/docs_src/index/test_rabbit.py +++ b/docs/docs_src/index/test_rabbit.py @@ -11,10 +11,10 @@ async def test_correct(): await br.publish({ "user": "John", "user_id": 1, - }) + }, "in-queue") @pytest.mark.asyncio async def test_invalid(): async with TestRabbitBroker(broker) as br: with pytest.raises(pydantic.ValidationError): - await br.publish("wrong message") + await br.publish("wrong message", "in-queue") From cb1f73860fdbba9bc7169ed868bdfefe1decd125 Mon Sep 17 00:00:00 2001 From: Lancetnik Date: Tue, 26 Sep 2023 10:19:24 +0300 Subject: [PATCH 2/4] fix #745: FastAPI support batch consuming --- faststream/asyncapi/schema/info.py | 2 +- faststream/broker/fastapi/route.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/faststream/asyncapi/schema/info.py b/faststream/asyncapi/schema/info.py index 2a5bd01d78..41d736667d 100644 --- a/faststream/asyncapi/schema/info.py +++ b/faststream/asyncapi/schema/info.py @@ -9,8 +9,8 @@ JsonSchemaValue, Required, TypedDict, - with_info_plain_validator_function, is_installed, + with_info_plain_validator_function, ) from faststream.log import logger diff --git a/faststream/broker/fastapi/route.py b/faststream/broker/fastapi/route.py index e84f61be8c..69db3cb652 100644 --- a/faststream/broker/fastapi/route.py +++ b/faststream/broker/fastapi/route.py @@ -232,8 +232,10 @@ async def app(message: NativeMessage[Any]) -> SendableMessage: """ body = message.decoded_body if first_arg is not None: - if not isinstance(body, dict): # pragma: no branch - fastapi_body: AnyDict = {first_arg: body} + if not isinstance(body, dict) and not isinstance( + body, list + ): + fastapi_body: Any = {first_arg: body} else: fastapi_body = body From 0171bda1474d483eea3ac968639a8c3690ddb701 Mon Sep 17 00:00:00 2001 From: Lancetnik Date: Tue, 26 Sep 2023 10:52:24 +0300 Subject: [PATCH 3/4] fix: use connection group as a part of handler key --- faststream/broker/fastapi/route.py | 4 +--- faststream/kafka/broker.py | 2 +- faststream/kafka/handler.py | 4 ++++ 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/faststream/broker/fastapi/route.py b/faststream/broker/fastapi/route.py index 69db3cb652..aa86db6a19 100644 --- a/faststream/broker/fastapi/route.py +++ b/faststream/broker/fastapi/route.py @@ -232,9 +232,7 @@ async def app(message: NativeMessage[Any]) -> SendableMessage: """ body = message.decoded_body if first_arg is not None: - if not isinstance(body, dict) and not isinstance( - body, list - ): + if not isinstance(body, dict) and not isinstance(body, list): fastapi_body: Any = {first_arg: body} else: fastapi_body = body diff --git a/faststream/kafka/broker.py b/faststream/kafka/broker.py index f8141f3a26..91eff34aac 100644 --- a/faststream/kafka/broker.py +++ b/faststream/kafka/broker.py @@ -374,7 +374,7 @@ def subscriber( # type: ignore[override] self._setup_log_context(topics) - key = "".join(topics) + key = Handler.get_routing_hash(topics, group_id) builder = partial( aiokafka.AIOKafkaConsumer, key_deserializer=key_deserializer, diff --git a/faststream/kafka/handler.py b/faststream/kafka/handler.py index 8ee55188a1..1b67b46c44 100644 --- a/faststream/kafka/handler.py +++ b/faststream/kafka/handler.py @@ -226,3 +226,7 @@ async def _consume(self) -> Never: if connected is False: connected = True await self.consume(msg) + + @staticmethod + def get_routing_hash(topics: Sequence[str], group_id: Optional[str] = None) -> str: + return "".join((*topics, group_id or "")) From 242249a8d97f4e9cf26c7a0db3e3f1a097d360bc Mon Sep 17 00:00:00 2001 From: Lancetnik Date: Tue, 26 Sep 2023 10:54:33 +0300 Subject: [PATCH 4/4] chore: bump version --- faststream/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/faststream/__about__.py b/faststream/__about__.py index ee1d14e5df..b88226f283 100644 --- a/faststream/__about__.py +++ b/faststream/__about__.py @@ -1,5 +1,5 @@ """Simple and fast framework to create message brokers based microservices""" -__version__ = "0.1.3" +__version__ = "0.1.4" INSTALL_YAML = """