Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace async Event with bool #1846

Merged
merged 3 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions faststream/_internal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
P_HookParams = ParamSpec("P_HookParams")
T_HookReturn = TypeVar("T_HookReturn")


if TYPE_CHECKING:
from faststream.asyncapi.schema import (
Contact,
Expand Down Expand Up @@ -71,7 +70,7 @@ def __init__(
) -> None:
context.set_global("app", self)

self._should_exit = anyio.Event()
self._should_exit = False
self.broker = broker
self.logger = logger
self.context = context
Expand Down Expand Up @@ -159,7 +158,12 @@ def after_shutdown(

def exit(self) -> None:
"""Stop application manually."""
self._should_exit.set()
self._should_exit = True

async def _main_loop(self, sleep_time: float) -> None:
"""Run loop till exit signal."""
while not self._should_exit: # noqa: ASYNC110 (requested by creator)
await anyio.sleep(sleep_time)

async def start(
self,
Expand Down
12 changes: 3 additions & 9 deletions faststream/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)

import anyio
from typing_extensions import Annotated, ParamSpec, deprecated
from typing_extensions import ParamSpec

from faststream._compat import ExceptionGroup
from faststream._internal.application import Application
Expand All @@ -35,13 +35,7 @@ async def run(
self,
log_level: int = logging.INFO,
run_extra_options: Optional[Dict[str, "SettingField"]] = None,
sleep_time: Annotated[
float,
deprecated(
"Deprecated in **FastStream 0.5.24**. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = 0.1,
sleep_time: float = 0.1,
) -> None:
"""Run FastStream Application."""
assert self.broker, "You should setup a broker" # nosec B101
Expand All @@ -54,7 +48,7 @@ async def run(
try:
async with anyio.create_task_group() as tg:
tg.start_soon(self._startup, log_level, run_extra_options)
await self._should_exit.wait()
await self._main_loop(sleep_time)
await self._shutdown(log_level)
tg.cancel_scope.cancel()
except ExceptionGroup as e:
Expand Down
Loading