Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscription AsyncGenerator raises error during client disconnected finalization process #889

Closed
wuyuanyi135 opened this issue Apr 29, 2021 · 7 comments · Fixed by #1002
Closed

Comments

@wuyuanyi135
Copy link
Contributor

wuyuanyi135 commented Apr 29, 2021

Hello, I found a very weird problem (Python 3.8):
When run the following code and open the GraphiQL interface (http://127.0.0.1/graphql), then create a query subscription { test } and refresh, the console will give: cannot reuse already awaited coroutine and task (detailed trace back attached later)

This happens when the Queue is blocked for a while without new put into the queue, i.e. if you refresh before 2.0 is yield, it may not give you this error.

Server code

import asyncio
from strawberry.asgi import GraphQL
from starlette.applications import Starlette
import strawberry

@strawberry.type
class Query:
    @strawberry.field
    def hello(self) -> str:
        return "World"

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def test(self) -> float:
        q = asyncio.Queue()
        q.put_nowait(1.0)
        q.put_nowait(2.0)
        async def agen():
            try:
                while True:
                    val = await q.get()
                    yield val
                    await asyncio.sleep(1.0)
            except GeneratorExit:
                print("bye")
            except BaseException as e:
                print(e)
        agen_obj = agen()
        return agen_obj


schema = strawberry.Schema(Query, subscription=Subscription)

graphql = GraphQL(schema)

app = Starlette(debug=True)
app.mount("/graphql", graphql)

GraphQL query

subscription {
  test
}

Similarly, if we do not return AsyncGenerator but use the function as an AsyncGenerator, the same problem is still there:"

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def test(self) -> float:
        q = asyncio.Queue()
        q.put_nowait(1.0)

        try:
            while True:
                val = await q.get()
                yield val
                await asyncio.sleep(1.0)
        except GeneratorExit:
            print("bye")
        except BaseException as e:
            print(e)

Trace back

After enabling PYTHONASYNCIODEBUG=1 I could see the detailed traceback now.

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Program Files\JetBrains\PyCharm 2019.3.1\plugins\python\helpers\pydev\pydevd.py", line 2173, in <module>
    main()
  File "C:\Program Files\JetBrains\PyCharm 2019.3.1\plugins\python\helpers\pydev\pydevd.py", line 2164, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Program Files\JetBrains\PyCharm 2019.3.1\plugins\python\helpers\pydev\pydevd.py", line 1476, in run
    return self._exec(is_module, entry_point_fn, module_name, file, globals, locals)
  File "C:\Program Files\JetBrains\PyCharm 2019.3.1\plugins\python\helpers\pydev\pydevd.py", line 1500, in _exec
    runpy._run_module_as_main(module_name, alter_argv=False)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\__main__.py", line 4, in <module>
    uvicorn.main()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 782, in main
    rv = self.invoke(ctx)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\main.py", line 362, in main
    run(**kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\main.py", line 386, in run
    server.run()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\server.py", line 49, in run
    loop.run_until_complete(self.serve(sockets=sockets))
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 570, in run_forever
    self._run_once()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 1851, in _run_once
    handle._run()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\strawberry\asgi\__init__.py", line 178, in handle_async_results
    async for result in results:
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\graphql\subscription\map_async_iterator.py", line 44, in __anext__
    anext = ensure_future(self.iterator.__anext__())
@wuyuanyi135
Copy link
Contributor Author

wuyuanyi135 commented Apr 29, 2021

After switching Python to 3.7, the stack trace changed. I believe it is a different issue in 3.7. Python 3.8 and 3.9 has the exactly same problem

bye  <--- Note this "bye" is printed correctly 
Task exception was never retrieved
future: <Task finished coro=<GraphQL.handle_async_results() done, defined at C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\strawberry\asgi\__init__.py:174> exception=RuntimeError("Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.")>
Traceback (most recent call last):
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\strawberry\asgi\__init__.py", line 178, in handle_async_results
    async for result in results:
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\graphql\subscription\map_async_iterator.py", line 47, in __anext__
    await wait([aclose, anext], return_when=FIRST_COMPLETED)
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\asyncio\tasks.py", line 389, in wait
    return await _wait(fs, timeout, return_when, loop)
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\asyncio\tasks.py", line 482, in _wait
    await waiter
concurrent.futures._base.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\strawberry\asgi\__init__.py", line 195, in handle_async_results
    operation_id,
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\strawberry\asgi\__init__.py", line 219, in _send_message
    return await websocket.send_json(data)
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\starlette\websockets.py", line 137, in send_json
    await self.send({"type": "websocket.send", "text": text})
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\starlette\websockets.py", line 68, in send
    await self._send(message)
  File "C:\Users\wuyua\anaconda3\envs\patstack-server\lib\site-packages\uvicorn\protocols\websockets\websockets_impl.py", line 239, in asgi_send
    raise RuntimeError(msg % message_type)
RuntimeError: Unexpected ASGI message 'websocket.send', after sending 'websocket.close'.
INFO:     ('127.0.0.1', 60791) - "WebSocket /graphql/" [accepted]
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_asend without __name__>()> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001DCE5E17CD8>()]>>
INFO:     127.0.0.1:60790 - "POST /graphql/ HTTP/1.1" 200 OK

@wuyuanyi135
Copy link
Contributor Author

wuyuanyi135 commented Apr 29, 2021

A possible cause due to the different behavior of asyncio in 3.8: https://bugs.python.org/issue38559

if the flag is set when we are about to run "anext()" or "athrow()" it means that another coroutine is reusing the same generator object in parallel and so we raise a RuntimeError.

Prior to 3.8, calling "aclose()" worked (maybe not in the most clean way). A GeneratorExit was thrown into an asynchronous generator regardless of whether it was running or not, aborting the execution.
In 3.8, calling "aclose()" can crash with a RuntimeError. It's no longer possible to reliably cancel a running asynchrounous generator.

@wuyuanyi135 wuyuanyi135 changed the title Failed finalization of subscription AsyncGenerator when asyncio.Queue.get is blocking and waiting subscription AsyncGenerator raises error during client disconnected finalization process Apr 29, 2021
@ossareh
Copy link
Contributor

ossareh commented Apr 29, 2021

@wuyuanyi135 I think you're hitting a timeout or something. Can you update your logging config so that we can see timestamps please?

Here's my working theory using your second code snippet:

  • first loop through the code you yield 1.0
  • second loop through the code you block because there is nothing on the q

from the docs about asyncio.Queue.get():

coroutine get()
Remove and return an item from the queue. If queue is empty, wait until an item is available.

Try using asyncio.wait_for:

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def test(self) -> float:
        q = asyncio.Queue()
        q.put_nowait(1.0)

        try:
            while True:
                val = await asyncio.wait_for(q.get(), timeout=1.0)
                yield val
                await asyncio.sleep(1.0)
        except asyncio.TimeoutError:
            print("bye")
        except BaseException as e:
            print(e)

@wuyuanyi135
Copy link
Contributor Author

wuyuanyi135 commented Apr 29, 2021

@ossareh Thank you!

Sorry my toy code snippet caused some confusions. I just put one item in the queue and let it wait forever to reproduce this error. The actual use case where I encountered this issue is when using a rxpy pipeline to generate real-time data and feed it to the asynchronous generator via the queue. In that case, the queue.get() should be waiting indefinitely for the new data rather than time-out, in my opinion.

I have updated my code for timestamp:

Code
import asyncio
import typing

from strawberry.asgi import GraphQL
from starlette.applications import Starlette
import strawberry
import logging

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

@strawberry.type
class Query:
    @strawberry.field
    def hello(self) -> str:
        return "World"

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def test(self) -> float:
        q = asyncio.Queue()
        [q.put_nowait(x) for x in range(2)]

        try:
            while True:
                logging.info("Waiting for the queue")
                val = await q.get()
                logging.info("Got value from queue")
                yield val
                await asyncio.sleep(1.0)
        except GeneratorExit:
            logging.info("bye")
        except BaseException as e:
            logging.info(e)


schema = strawberry.Schema(Query, subscription=Subscription)

graphql = GraphQL(schema)

app = Starlette(debug=True)
app.mount("/graphql", graphql)

The trace with time stamps:

INFO:     127.0.0.1:59711 - "POST /graphql/ HTTP/1.1" 200 OK
2021-04-29 15:26:20 INFO     Waiting for the queue
2021-04-29 15:26:20 INFO     Got value from queue
2021-04-29 15:26:21 INFO     Waiting for the queue
2021-04-29 15:26:21 INFO     Got value from queue
2021-04-29 15:26:22 INFO     Waiting for the queue
INFO:     127.0.0.1:59711 - "GET /graphql/ HTTP/1.1" 200 OK
INFO:     ('127.0.0.1', 59716) - "WebSocket /graphql/" [accepted]
2021-04-29 15:26:22 INFO     ('127.0.0.1', 59716) - "WebSocket /graphql/" [accepted]
2021-04-29 15:26:22 ERROR    Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\__main__.py", line 4, in <module>
    uvicorn.main()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 782, in main
    rv = self.invoke(ctx)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\click\core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\main.py", line 362, in main
    run(**kwargs)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\main.py", line 386, in run
    server.run()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\uvicorn\server.py", line 49, in run
    loop.run_until_complete(self.serve(sockets=sockets))
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 603, in run_until_complete
    self.run_forever()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 570, in run_forever
    self._run_once()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py", line 1851, in _run_once
    handle._run()
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\strawberry\asgi\__init__.py", line 178, in handle_async_results
    async for result in results:
  File "C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\graphql\subscription\map_async_iterator.py", line 44, in __anext__
    anext = ensure_future(self.iterator.__anext__())
task: <Task pending name='Task-58' coro=<<async_generator_asend without __name__>()> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001DCF3470C10>()] created at C:\Users\wuyua\anaconda3\envs\patstack\lib\asyncio\base_events.py:422> created at C:\Users\wuyua\anaconda3\envs\patstack\lib\site-packages\graphql\subscription\map_async_iterator.py:44>
2021-04-29 15:26:22 INFO     cannot reuse already awaited coroutine
INFO:     127.0.0.1:59711 - "POST /graphql/ HTTP/1.1" 200 OK

From this trace, I basically ended the websocket connection immediately when it started the infinite wait, so it did not seem like some time out. If I let it hang for a while in await queue.get(), the same trace back is there.

Also, I tried many different ways to kill the asynchronous generator object and the async task, but I could not reproduce the same error as seen in here - the queue could always be closed without an issue.

@wuyuanyi135
Copy link
Contributor Author

@wuyuanyi135
Copy link
Contributor Author

There is a catch in the asgi server:

except Exception as error:

This line catches Exception-derived exceptions but since 3.8 (https://docs.python.org/3.8/library/asyncio-exceptions.html#asyncio.CancelledError) it is now derived from BaseException, hence the different behavior in 3.7 and 3.8

@wuyuanyi135
Copy link
Contributor Author

After play with this library and the upstream graphql-core framework, I could trace the cause of this problem.

Client disconnect procedures

  1. Websocket receiving task is interrupted by exception

    message = await websocket.receive_json()

  2. Handle the exception for finalization. First, aclose the MapAsyncIterator object. Then cancel the handle_async_results task.

    await subscriptions[operation_id].aclose()
    tasks[operation_id].cancel()

  3. The wait is interrupted by an asyncio.CancelledError after aclose is invoked. It was not caught by the current code base. This leads to the anext task un-cancelled, which is the root cause of why the asynchronous generator for subscription may not be consistently finalized when the client disconnects.
    https://github.com/graphql-python/graphql-core/blob/8d63f1efb91b22ac4ce727f45ae927df0139093c/src/graphql/subscription/map_async_iterator.py#L47

  4. The uncaught exception is propagated to the function waiting for the asynchronous result

    async for result in results:
    and interrupts it with asyncio.CancelledError. This error will be caught in Python 3.7 in
    except Exception as error:
    , which is undesired because we do not want to send message to the client if this error means the client is gone. In Python 3.8, this error is not caught because the error is no longer derived from Exception class. This results in unhandled exception and eventually jam up the console with error messages.

Proposed fix

Two PRs (#897 and graphql-python/graphql-core#131) should fix the issue.
The upstream MapAsyncIterator should capture the CancelledError while waiting for the result or close event. When it is cancelled, all awaiting tasks should be cancelled.
The asgi backend should handle the exception of CancelledError and prevent it send error report to the client. This fix should work for both Python < and >= 3.8. Note that even if

websocket.client_state != WebSocketState.DISCONNECTED
checks the websocket availability, it seems the after cancellation the check still assumes the connection is alive. I have to wrap the send with try-except. Maybe we can remove the if to reduce the redundancy?

patrick91 pushed a commit to wuyuanyi135/strawberry that referenced this issue May 11, 2021
DoctorJohn pushed a commit to DoctorJohn/strawberry that referenced this issue May 15, 2021
DoctorJohn pushed a commit to DoctorJohn/strawberry that referenced this issue Jun 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants