Skip to content

Commit

Permalink
Fix: topics, typo
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Nov 20, 2024
1 parent 0297697 commit 20185a3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 26 deletions.
1 change: 0 additions & 1 deletion faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,6 @@ def subscriber(
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
max_workers=max_workers,
)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class AsyncAPIBatchSubscriber(


class AsyncAPIConcurrentDefaultSubscriber(
ConcurrentDefaultSubscriber,
AsyncAPISubscriber["ConsumerRecord"],
ConcurrentDefaultSubscriber,
):
pass
56 changes: 32 additions & 24 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,13 @@ def get_log_context(
)


class ConcurrentDefaultSubscriber(ConcurrentMixin, DefaultSubscriber["ConsumerRecord"]):
class ConcurrentDefaultSubscriber(
ConcurrentMixin,
DefaultSubscriber
):
def __init__(
self,
*topics: str,
max_workers: int,
# Kafka information
group_id: Optional[str],
listener: Optional["ConsumerRebalanceListener"],
Expand All @@ -487,6 +489,7 @@ def __init__(
partitions: Iterable["TopicPartition"],
is_manual: bool,
# Subscriber args
max_workers: int,
no_ack: bool,
no_reply: bool,
retry: bool,
Expand All @@ -497,32 +500,13 @@ def __init__(
description_: Optional[str],
include_in_schema: bool,
) -> None:
if pattern:
reg, pattern = compile_path(
pattern,
replace_symbol=".*",
patch_regex=lambda x: x.replace(r"\*", ".*"),
)

else:
reg = None

parser = AioKafkaParser(
msg_class=KafkaAckableMessage if is_manual else KafkaMessage,
regex=reg,
)

super().__init__(
*topics,
max_workers=max_workers,
group_id=group_id,
listener=listener,
pattern=pattern,
connection_args=connection_args,
partitions=partitions,
# subscriber args
default_parser=parser.parse_message,
default_decoder=parser.decode_message,
is_manual=is_manual,
# Propagated args
no_ack=no_ack,
no_reply=no_reply,
Expand All @@ -533,8 +517,32 @@ def __init__(
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
max_workers=max_workers,
)
self.topics = topics

async def _consume(self) -> None:
assert self.consumer, "You should start subscriber at first." # nosec B101

connected = True

async def _put_msg(self, msg: "KafkaMessage") -> None:
self.start_consume_task()
return await super()._put_msg(msg)
while self.running:
try:
msg = await self.get_msg()

# pragma: no cover
except KafkaError: # noqa: PERF203
if connected:
connected = False
await anyio.sleep(5)

except ConsumerStoppedError:
return

else:
if not connected: # pragma: no cover
connected = True

if msg:
await self._put_msg(msg)

0 comments on commit 20185a3

Please sign in to comment.