Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add explicit message source enum #1866

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ search:
- [StreamRouter](api/faststream/broker/fastapi/router/StreamRouter.md)
- message
- [AckStatus](api/faststream/broker/message/AckStatus.md)
- [SourceType](api/faststream/broker/message/SourceType.md)
- [StreamMessage](api/faststream/broker/message/StreamMessage.md)
- [decode_message](api/faststream/broker/message/decode_message.md)
- [encode_message](api/faststream/broker/message/encode_message.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/broker/message/SourceType.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.message.SourceType
2 changes: 2 additions & 0 deletions faststream/broker/core/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from faststream._compat import is_test_env
from faststream.broker.core.logging import LoggingBroker
from faststream.broker.message import SourceType
from faststream.broker.middlewares.logging import CriticalLogMiddleware
from faststream.broker.proto import SetupAble
from faststream.broker.subscriber.proto import SubscriberProto
Expand Down Expand Up @@ -376,6 +377,7 @@ async def request(

parsed_msg: StreamMessage[Any] = await producer._parser(published_msg)
parsed_msg._decoded_body = await producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

@abstractmethod
Expand Down
9 changes: 9 additions & 0 deletions faststream/broker/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class AckStatus(str, Enum):
rejected = "rejected"


class SourceType(str, Enum):
Consume = "Consume"
"""Message consumed by basic subscriber flow."""

Response = "Response"
"""RPC response consumed."""


def gen_cor_id() -> str:
"""Generate random string to use as ID."""
return str(uuid4())
Expand All @@ -60,6 +68,7 @@ class StreamMessage(Generic[MsgType]):

processed: bool = field(default=False, init=False)
committed: Optional[AckStatus] = field(default=None, init=False)
_source_type: SourceType = field(default=SourceType.Consume)
_decoded_body: Optional["DecodedMessage"] = field(default=None, init=False)

async def ack(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion faststream/confluent/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from confluent_kafka import Message
from typing_extensions import override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.broker.types import MsgType
from faststream.exceptions import NOT_CONNECTED_YET
Expand Down Expand Up @@ -124,6 +124,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/kafka/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from aiokafka import ConsumerRecord
from typing_extensions import Annotated, Doc, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.broker.types import MsgType
from faststream.exceptions import NOT_CONNECTED_YET
Expand Down Expand Up @@ -177,6 +177,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/nats/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from nats.aio.msg import Msg
from typing_extensions import Annotated, Doc, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.utils.functions import return_input
Expand Down Expand Up @@ -212,6 +212,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
3 changes: 2 additions & 1 deletion faststream/rabbit/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from aio_pika import IncomingMessage
from typing_extensions import Annotated, Doc, TypedDict, Unpack, deprecated, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.rabbit.schemas import BaseRMQInformation, RabbitQueue
Expand Down Expand Up @@ -373,6 +373,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down
5 changes: 4 additions & 1 deletion faststream/redis/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from typing_extensions import Annotated, Doc, deprecated, override

from faststream.broker.message import gen_cor_id
from faststream.broker.message import SourceType, gen_cor_id
from faststream.broker.publisher.usecase import PublisherUsecase
from faststream.exceptions import NOT_CONNECTED_YET
from faststream.redis.message import UnifyRedisDict
Expand Down Expand Up @@ -268,6 +268,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down Expand Up @@ -481,6 +482,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Expand Down Expand Up @@ -762,6 +764,7 @@ async def request(

parsed_msg = await self._producer._parser(published_msg)
parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
parsed_msg._source_type = SourceType.Response
return await return_msg(parsed_msg)

raise AssertionError("unreachable")
Loading