Skip to content

How to programmatically stop the faststream app? #1172

Answered by Lancetnik
fyber asked this question in Q&A
Discussion options

You must be logged in to vote

@fyber, sorry, for now we have no public API to stop application, but you can patch FastStream object to implement it by yourself

import anyio

from faststream import FastStream
from faststream.cli.supervisors.utils import set_exit
from faststream.nats import NatsBroker


class StoppableApp(FastStream):
    async def _stop(self, *args) -> None:
        event = self.event = anyio.Event()
        set_exit(lambda *_: event.set())
        await event.wait()

        await self._shutdown()

broker = NatsBroker()
app = StoppableApp(broker)

And call app.event.set() anywhere

It is a dirty trick, but working and you can use it right now

Replies: 1 comment

Comment options

You must be logged in to vote
0 replies
Answer selected by fyber
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants