Skip to content

Commit

Permalink
Feat: change consume to put
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Dumchenko committed Nov 17, 2024
1 parent ca0f4ef commit 0297697
Showing 1 changed file with 2 additions and 25 deletions.
27 changes: 2 additions & 25 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,29 +535,6 @@ def __init__(
include_in_schema=include_in_schema,
)

async def _consume(self) -> None:
assert self.consumer, "You should start subscriber at first." # nosec B101

connected = True

async def _put_msg(self, msg: "KafkaMessage") -> None:
self.start_consume_task()

while self.running:
try:
msg = await self.get_msg()

# pragma: no cover
except KafkaError: # noqa: PERF203
if connected:
connected = False
await anyio.sleep(5)

except ConsumerStoppedError:
return

else:
if not connected: # pragma: no cover
connected = True

if msg:
await self.consume(msg)
return await super()._put_msg(msg)

0 comments on commit 0297697

Please sign in to comment.