Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker.subscriber.get_one() #1726

Merged
merged 71 commits into from
Sep 8, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
1d35cbd
subscriber.get_one()
KrySeyt Aug 23, 2024
02ee4bb
Merge branch 'airtai:main' into add_subscriber_get_one
KrySeyt Aug 23, 2024
4ba22b6
remove _prepare
KrySeyt Aug 23, 2024
8e71f73
ruff satisfied
KrySeyt Aug 23, 2024
d119b66
fixes
KrySeyt Aug 23, 2024
907482c
fixes
KrySeyt Aug 23, 2024
1be821e
fixes
KrySeyt Aug 23, 2024
2ce5a5d
fixes
KrySeyt Aug 23, 2024
c439b4a
Kafka subscriber.get_one()
KrySeyt Aug 24, 2024
2ed83c5
Confluent subscriber.get_one()
KrySeyt Aug 24, 2024
9a2efde
refactor: polist RMQ get_one method
Lancetnik Aug 24, 2024
bc85f53
Small refactoring of get_one
KrySeyt Aug 24, 2024
e2347be
Rabbit get_one error fix
KrySeyt Aug 24, 2024
768f637
Kafka get_one update
KrySeyt Aug 24, 2024
81b3edf
Confluent get_one update
KrySeyt Aug 24, 2024
3d89d0f
Redis channel get_one
KrySeyt Aug 25, 2024
9ec0e56
Redis list get_one draft
KrySeyt Aug 25, 2024
053d2f2
Redis batch list get_one draft
KrySeyt Aug 25, 2024
fe4d000
Redis channel get_one update and list get_one message decoding
KrySeyt Aug 25, 2024
2cd00dd
Redis list batch get_one message decoding
KrySeyt Aug 25, 2024
cb9e0d7
Redis stream get_one
KrySeyt Aug 25, 2024
80c55b8
Redis batch stream get_one
KrySeyt Aug 25, 2024
afe7d08
Merge branch 'airtai:main' into add_subscriber_get_one
KrySeyt Aug 25, 2024
0dc8318
Redis channel get_one fix
KrySeyt Aug 25, 2024
b269ba6
Update brokers start methods
KrySeyt Aug 25, 2024
e4aea95
remove unnecessary code
KrySeyt Aug 25, 2024
8ef531e
Nats CoreSubscriber.get_one
KrySeyt Aug 27, 2024
e4d7079
Nats CoreSubscriber.get_one timeout support
KrySeyt Aug 27, 2024
f6d136b
Nats PullStreamSubscriber get_one
KrySeyt Aug 27, 2024
0fa93a6
Nats KeyValueWatchSubscriber get_one prototype
KrySeyt Aug 27, 2024
6748a79
Nats ObjStoreWatchSubscriber get_one prototype
KrySeyt Aug 27, 2024
d9e187b
Add Nats additional get_one methods
KrySeyt Aug 27, 2024
ebbcef7
refactor: polist RMQ get_one method
Lancetnik Aug 24, 2024
fed12cc
Rabbit subscriber get_one tests
KrySeyt Aug 27, 2024
7e55e57
Kafka subscriber get_one tests
KrySeyt Aug 28, 2024
6d6b139
Confluent subscriber get_one tests
KrySeyt Aug 28, 2024
f17ce3d
Redis subscriber get_one tests
KrySeyt Aug 28, 2024
cdc5b49
Nats core and JS get_one tests
KrySeyt Aug 30, 2024
7ae5e51
Nats PoolSubscriber get_one tests
KrySeyt Aug 30, 2024
3c30b4a
Merge branch 'main' into add_subscriber_get_one
Lancetnik Aug 31, 2024
837c3ef
Nats batch pull get_one tests + fixes
KrySeyt Sep 1, 2024
e678e69
Nats get_one with filter test
KrySeyt Sep 1, 2024
ad12056
Merge branch 'main' into add_subscriber_get_one
Lancetnik Sep 6, 2024
302e5d3
Nats CoreSubscriber.get_one small refactoring
KrySeyt Sep 6, 2024
8a64818
refactor: polish get_one
Lancetnik Sep 8, 2024
3c44939
chore: merge latest
Lancetnik Sep 8, 2024
d52e1fb
lint: fix redis mypy
Lancetnik Sep 8, 2024
fbafc62
lint: fix rabbit mypy
Lancetnik Sep 8, 2024
42f898b
lint: fix kafka mypy
Lancetnik Sep 8, 2024
2715e79
lint: fix confluent mypy
Lancetnik Sep 8, 2024
c805280
lint: fix kafka mypy
Lancetnik Sep 8, 2024
35b7195
lint: fix nats mypy
Lancetnik Sep 8, 2024
9664558
lint: fix precommit
Lancetnik Sep 8, 2024
65b4065
Merge branch 'main' into add_subscriber_get_one
Lancetnik Sep 8, 2024
164f6cb
refactor: fix nats unsub
Lancetnik Sep 8, 2024
fee651f
Merge branch 'add_subscriber_get_one' of github.com:KrySeyt/faststrea…
Lancetnik Sep 8, 2024
669d7c1
fix: correct redis timeout
Lancetnik Sep 8, 2024
e94d220
Merge branch 'main' into add_subscriber_get_one
Lancetnik Sep 8, 2024
0a75e12
fix: correct redis channel sub
Lancetnik Sep 8, 2024
1056fd9
lint: fix precommit
Lancetnik Sep 8, 2024
f99dc08
tests: mv get_one tests to basic testcase
Lancetnik Sep 8, 2024
6602569
tests: mv get_one tests to real testcase
Lancetnik Sep 8, 2024
283944b
docs: generate API References
Lancetnik Sep 8, 2024
8453092
refactor: use process_msg everywhere
Lancetnik Sep 8, 2024
be854e2
Merge branch 'add_subscriber_get_one' of github.com:KrySeyt/faststrea…
Lancetnik Sep 8, 2024
fd952c8
refactor: mv process_msg broker.utils
Lancetnik Sep 8, 2024
5d7d502
lint: fix mypy
Lancetnik Sep 8, 2024
584a2f9
Nats KV and Obj subscribers get_one tests
KrySeyt Sep 8, 2024
2197c61
Nats KV and Obj subscribers get_one timeout tests
KrySeyt Sep 8, 2024
04b7bd7
format fix
KrySeyt Sep 8, 2024
149b220
tests: tefactor timeout tests
Lancetnik Sep 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions faststream/broker/subscriber/proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ async def consume(self, msg: MsgType) -> Any: ...
@abstractmethod
async def process_message(self, msg: MsgType) -> Any: ...

@abstractmethod
async def get_one(
self, *, timeout: float = 5.0
) -> "Optional[StreamMessage[MsgType]]": ...

@abstractmethod
def add_call(
self,
Expand Down
19 changes: 9 additions & 10 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ def __init__(
"""Initialize a new instance of the class."""
self.calls = []

self._default_parser = default_parser
self._default_decoder = default_decoder
self._parser = default_parser
self._decoder = default_decoder
self._no_reply = no_reply
# Watcher args
self._no_ack = no_ack
Expand Down Expand Up @@ -163,18 +163,17 @@ def setup( # type: ignore[override]

for call in self.calls:
if parser := call.item_parser or broker_parser:
async_parser = resolve_custom_func(
to_async(parser), self._default_parser
)
async_parser = resolve_custom_func(to_async(parser), self._parser)
else:
async_parser = self._default_parser
async_parser = self._parser

if decoder := call.item_decoder or broker_decoder:
async_decoder = resolve_custom_func(
to_async(decoder), self._default_decoder
)
async_decoder = resolve_custom_func(to_async(decoder), self._decoder)
else:
async_decoder = self._default_decoder
async_decoder = self._decoder

self._parser = async_parser
self._decoder = async_decoder

call.setup(
parser=async_parser,
Expand Down
31 changes: 30 additions & 1 deletion faststream/confluent/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from abc import ABC, abstractmethod
from contextlib import AsyncExitStack
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -9,7 +11,7 @@
List,
Optional,
Sequence,
Tuple,
Tuple, Awaitable,
)

import anyio
Expand All @@ -19,8 +21,10 @@
from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.broker.types import MsgType
from faststream.confluent.message import KafkaMessage
from faststream.confluent.parser import AsyncConfluentParser
from faststream.confluent.schemas import TopicPartition
from faststream.utils.functions import return_input

if TYPE_CHECKING:
from fast_depends.dependencies import Depends
Expand Down Expand Up @@ -152,6 +156,9 @@ async def start(self) -> None:

await super().start()

if not self.calls:
return

self.task = asyncio.create_task(self._consume())

async def close(self) -> None:
Expand All @@ -166,6 +173,28 @@ async def close(self) -> None:

self.task = None

async def get_one(self, timeout: float = 5.0) -> "Optional[KafkaMessage]":
assert self.consumer, "You should start subscriber at first."
assert ( # nosec B101
not self.calls
), "You can't use `get_one` method if subscriber has registered handlers."

raw_message = await self.consumer.getone(timeout=timeout)

async with AsyncExitStack() as stack:
return_msg: Callable[[KafkaMessage], Awaitable[KafkaMessage]] = (
return_input
)

for m in self._broker_middlewares:
mid = m(raw_message)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)

parsed_msg = await self._parser(raw_message)
parsed_msg._decoded_body = await self._decoder(parsed_msg)
return await return_msg(parsed_msg)

def _make_response_publisher(
self,
message: "StreamMessage[Any]",
Expand Down
35 changes: 34 additions & 1 deletion faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from abc import ABC, abstractmethod
from contextlib import AsyncExitStack
from functools import partial
from itertools import chain
from typing import (
TYPE_CHECKING,
Expand All @@ -10,7 +12,7 @@
List,
Optional,
Sequence,
Tuple,
Tuple, Awaitable,
)

import anyio
Expand All @@ -28,6 +30,7 @@
)
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser
from faststream.utils.functions import return_input
from faststream.utils.path import compile_path

if TYPE_CHECKING:
Expand Down Expand Up @@ -164,6 +167,9 @@ async def start(self) -> None:
await consumer.start()
await super().start()

if not self.calls:
return

self.task = asyncio.create_task(self._consume())

async def close(self) -> None:
Expand All @@ -178,6 +184,33 @@ async def close(self) -> None:

self.task = None

async def get_one(self, timeout: float = 5) -> "Optional[KafkaMessage]":
assert self.consumer, "You should start subscriber at first."
assert ( # nosec B101
not self.calls
), "You can't use `get_one` method if subscriber has registered handlers."

raw_messages = await self.consumer.getmany(timeout_ms=timeout * 1000, max_records=1)
Lancetnik marked this conversation as resolved.
Show resolved Hide resolved

if not raw_messages:
return None

(raw_message,) ,= raw_messages.values()

async with AsyncExitStack() as stack:
return_msg: Callable[[KafkaMessage], Awaitable[KafkaMessage]] = (
return_input
)

for m in self._broker_middlewares:
mid = m(raw_message)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)

parsed_msg = await self._parser(raw_message)
parsed_msg._decoded_body = await self._decoder(parsed_msg)
return await return_msg(parsed_msg)

def _make_response_publisher(
self,
message: "StreamMessage[Any]",
Expand Down
59 changes: 54 additions & 5 deletions faststream/rabbit/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from contextlib import AsyncExitStack
from functools import partial
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Iterable,
Expand All @@ -9,13 +12,16 @@
Union,
)

import anyio
from typing_extensions import override

from faststream.broker.publisher.fake import FakePublisher
from faststream.broker.subscriber.usecase import SubscriberUsecase
from faststream.exceptions import SetupError
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.parser import AioPikaParser
from faststream.rabbit.schemas import BaseRMQInformation
from faststream.utils.functions import return_input

if TYPE_CHECKING:
from aio_pika import IncomingMessage, RobustQueue
Expand All @@ -24,6 +30,7 @@
from faststream.broker.message import StreamMessage
from faststream.broker.types import BrokerMiddleware, CustomCallable
from faststream.rabbit.helpers.declarer import RabbitDeclarer
from faststream.rabbit.message import RabbitMessage
from faststream.rabbit.publisher.producer import AioPikaFastProducer
from faststream.rabbit.schemas import (
RabbitExchange,
Expand Down Expand Up @@ -156,11 +163,12 @@ async def start(self) -> None:
robust=self.queue.robust,
)

self._consumer_tag = await queue.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
self.consume, # type: ignore[arg-type]
arguments=self.consume_args,
)
if self.calls:
self._consumer_tag = await self._queue_obj.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
self.consume, # type: ignore[arg-type]
arguments=self.consume_args,
)

await super().start()

Expand All @@ -175,6 +183,47 @@ async def close(self) -> None:

self._queue_obj = None

async def get_one(
self,
*,
timeout: float = 5.0,
no_ack: bool = True,
) -> "Optional[RabbitMessage]":
assert self._queue_obj, "You should start subscriber at first." # nosec B101
assert ( # nosec B101
not self.calls
), "You can't use `get_one` method if subscriber has registered handlers."

sleep_interval = timeout / 10

raw_message: Optional[IncomingMessage] = None
with anyio.move_on_after(timeout):
while ( # noqa: ASYNC110
raw_message := await self._queue_obj.get(
fail=False,
no_ack=no_ack,
timeout=timeout,
)
) is None:
await anyio.sleep(sleep_interval)

if raw_message is None:
return None

async with AsyncExitStack() as stack:
return_msg: Callable[[RabbitMessage], Awaitable[RabbitMessage]] = (
return_input
)

for m in self._broker_middlewares:
mid = m(raw_message)
await stack.enter_async_context(mid)
return_msg = partial(mid.consume_scope, return_msg)

parsed_msg = await self._parser(raw_message)
parsed_msg._decoded_body = await self._decoder(parsed_msg)
return await return_msg(parsed_msg)

def _make_response_publisher(
self,
message: "StreamMessage[Any]",
Expand Down
Loading