Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unexpected background task cancellation #1699

Closed
wants to merge 7 commits into from

Conversation

kigawas
Copy link

@kigawas kigawas commented Jun 20, 2022

The starting point for contributions should usually be a discussion

Simple documentation typos may be raised as stand-alone pull requests, but otherwise please ensure you've discussed your proposal prior to issuing a pull request.

This will help us direct work appropriately, and ensure that any suggested changes have been okayed by the maintainers.

@Kludex
Copy link
Member

Kludex commented Jun 20, 2022

Unfortunately, this is not a solution. 😞

More info on why this is not a solution: #1441

@kigawas
Copy link
Author

kigawas commented Jun 20, 2022

Okay, this can be resolved by adding these lines:

            t = anyio.get_current_task()
            if t.name == "anyio.from_thread.BlockingPortal._call_func":
                # cancel stuck task due to discarded response
                # see: https://github.com/encode/starlette/issues/1022
                task_group.cancel_scope.cancel()

@Kludex Kludex added the hold Don't merge it label Jun 20, 2022
@kigawas
Copy link
Author

kigawas commented Jun 20, 2022

@Kludex Can you review this approach?

@Kludex
Copy link
Member

Kludex commented Jun 20, 2022

Okay, this can be resolved by adding these lines:

            t = anyio.get_current_task()
            if t.name == "anyio.from_thread.BlockingPortal._call_func":
                # cancel stuck task due to discarded response
                # see: https://github.com/encode/starlette/issues/1022
                task_group.cancel_scope.cancel()

This issue has 4 years. A bit of explanation on what you're doing here would be cool...

@kigawas
Copy link
Author

kigawas commented Jun 20, 2022

After tracing the tasks under CancelScope, I finally found that task. Probably due to silenced anyio's WouldBlock exception.
I'll provide more details tomorrow

@kigawas
Copy link
Author

kigawas commented Jun 21, 2022

Conclusion

To sum up, to solve the problem of #1022, the anyio integration in #1157 introduced task_group.cancel_scope.cancel() to cancel stuck tasks. However, if there are still other background tasks, they will also be cancelled.

Then the approach is simple: just cancel the stuck one, which is anyio.from_thread.BlockingPortal._call_func

Postmortem

If we scrutinize the whole stack and trace which task got stuck, the coro is pretty suspicious.

        async def call_next(request: Request) -> Response:
            app_exc: typing.Optional[Exception] = None
            send_stream, recv_stream = anyio.create_memory_object_stream()

            async def coro() -> None:
                nonlocal app_exc

                async with send_stream:
                    try:
                        await self.app(scope, request.receive, send_stream.send)
                    except Exception as exc:
                        app_exc = exc

            task_group.start_soon(coro)

Let's dig into the send_stream.send. We can find self.send_nowait raises WouldBlock and it'll be caught underneath except, then it never returns from await send_event.wait().

# from anyio/streams/memory.py
    async def send(self, item: T_Item) -> None:
        await checkpoint()
        try:
            self.send_nowait(item)
        except WouldBlock:
            # Wait until there's someone on the receiving end
            send_event = Event()
            self._state.waiting_senders[send_event] = item
            try:
                await send_event.wait()
            except BaseException:
                self._state.waiting_senders.pop(send_event, None)  # type: ignore[arg-type]
                raise

            if self._state.waiting_senders.pop(send_event, None):  # type: ignore[arg-type]
                raise BrokenResourceError

We got stuck with asyncio, so let's hit the asyncio backends of anyio:

class Event(BaseEvent):
    def __new__(cls) -> "Event":
        return object.__new__(cls)

    def __init__(self) -> None:
        self._event = asyncio.Event()

    def set(self) -> DeprecatedAwaitable:
        self._event.set()
        return DeprecatedAwaitable(self.set)

    def is_set(self) -> bool:
        return self._event.is_set()

    async def wait(self) -> None:
        if await self._event.wait():
            await checkpoint()

From the documentation of asyncio.Event.wait:

Block until the internal flag is true.

If the internal flag is true on entry, return True immediately. Otherwise, block until another coroutine calls set() to set the flag to true, then return True.

Solution

Let's recap the send_stream.send above, if there're no waiters, send_stream.send will block. Why there're no waiters? Because call_next (inherently BaseHTTPMiddleware.__call__) was called before and memory stream was created by anyio.create_memory_object_stream() but StreamingResponse is not consumed!

In order to resolve this problem, you either consume it manually:

    class CustomMiddleware(BaseHTTPMiddleware):
        async def dispatch(self, request, call_next):
            resp = await call_next(request)

            async def _send(m):
                pass

            await resp.stream_response(_send)

            return PlainTextResponse("Custom")

Or just ignore the call_next:

    class CustomMiddleware(BaseHTTPMiddleware):
        async def dispatch(self, request, call_next):
            return PlainTextResponse("Custom")

Or just do the same in my PR to cancel the tangled "producer awaiting sending but no consumer receiving" task.

===UPDATE===
The cancellation still cancels some lingering tasks. Need to ponder.

@kigawas
Copy link
Author

kigawas commented Jun 21, 2022

Okay, just to shield the background tasks:

diff --git a/starlette/background.py b/starlette/background.py
index 4aaf7ae..db9b38a 100644
--- a/starlette/background.py
+++ b/starlette/background.py
@@ -1,6 +1,8 @@
 import sys
 import typing
 
+import anyio
+
 if sys.version_info >= (3, 10):  # pragma: no cover
     from typing import ParamSpec
 else:  # pragma: no cover
@@ -22,10 +24,11 @@ class BackgroundTask:
         self.is_async = is_async_callable(func)
 
     async def __call__(self) -> None:
-        if self.is_async:
-            await self.func(*self.args, **self.kwargs)
-        else:
-            await run_in_threadpool(self.func, *self.args, **self.kwargs)
+        with anyio.CancelScope(shield=True):
+            if self.is_async:
+                await self.func(*self.args, **self.kwargs)
+            else:
+                await run_in_threadpool(self.func, *self.args, **self.kwargs)

@Kludex
Copy link
Member

Kludex commented Jun 21, 2022

But then only shielding the bkg tasks will be enough... Like #1654

@kigawas
Copy link
Author

kigawas commented Jun 21, 2022

There's no need to call cancel every time. If it's starlette.middleware.base.BaseHTTPMiddleware.__call__.<locals>.call_next.<locals>.coro, it's okay to ignore

@jhominal
Copy link
Member

As the reporter on #1438, I would say two things about this PR:

  1. I would be quite unhappy with a solution that relied on checking the current task name - having a magic string seems to me very hacky and fragile;
  2. I think that the location of the cancellation shield is wrong - e.g. if you use the BackgroundTasks object to make multiple tasks, when the cancellation is triggered, the for loop in BackgroundTasks.__call__ will be interrupted - I believe that the cancellation shield should go in the Response class;

@kigawas
Copy link
Author

kigawas commented Jun 22, 2022

The background tasks are shielded in Response, StreamingResponse and FileResponse now

@kigawas kigawas changed the title Remove unexpected task cancellation Avoid unexpected background task cancellation Jun 22, 2022
@gnat
Copy link

gnat commented Jul 4, 2022

Hope to see a more permanent solution to this eventually.

For anyone struggling with this and using asyncio...

import asyncio

async def doit():
	await asyncio.sleep(5)

async def main(request):
	await asyncio.shield(doit())
	return PlainTextResponse('Hello')

The only downside is of the user closes the window before the process completes, you will get a RuntimeError: No response returned.

If you need this for a sync function, just use run_in_threadpool

@gnat
Copy link

gnat commented Jul 8, 2022

Looks like Trio-esque Task Groups and Exception Groups are built into asyncio for python 3.11: https://realpython.com/python311-exception-groups/

@Kludex Kludex closed this in #1715 Sep 24, 2022
@kigawas kigawas deleted the patch-1 branch October 13, 2022 06:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
hold Don't merge it
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants