Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't abruptly shutdown active websockets on graceful shutdown. #1140

Closed
wants to merge 4 commits into from

Conversation

aviramha
Copy link
Contributor

@aviramha aviramha commented Jul 30, 2021

@aviramha
Copy link
Contributor Author

@euri10 Let me know what you think.

@euri10
Copy link
Member

euri10 commented Jul 30, 2021

so if I follow correctly the stream of things here we have def shutdown in the ws protocols or http protocols being called in server.py by

        # Request shutdown on all existing connections.
        for connection in list(self.server_state.connections):
            connection.shutdown()

now in the http protocols we

  1. either close the transport of the request/response cycle if it is complete
  2. either we set the cycle keep_alive flag to False

and in case of the ws protocols the transport will be closed as soon as run_asgi is finished (because all failing cases here close the transport), is that correct ?

I'm just trying to see if there would be case we'd like to close the transport in htat shutdown method anyway but I fail to see any

also on a side note would that incidentalyy fix #596 ?

Comment on lines -132 to -134
self.queue.put_nowait({"type": "websocket.disconnect", "code": 1012})
output = self.conn.send(wsproto.events.CloseConnection(code=1012))
self.transport.write(output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed ? won't it leave the protocol in a state it should not be in, how it will know it's in a closing state ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's same as the other implementation - run_asgi cleans up each time. The current flow just injects data into an active connection causing worse issues :(

@aviramha
Copy link
Contributor Author

It seems like it should fix #596.
As you wrote, there's no reason to close the socket while having a run_asgi that closes it nicely.
The only danger is a websocket that stays open and the application underneath can't get a notification to stop.
For example, if using Starlette and having such code:

def ws_path(ws):
    while True:
        data = ws.recv()
        ws.send(data)

But in this case, same behavior will happen - it'll be shutdown abruptly so I think we should tackle this later on (maybe by having a prior signal besides the shutdown?)

@aviramha aviramha requested a review from euri10 August 2, 2021 12:55
@euri10
Copy link
Member

euri10 commented Aug 4, 2021

it does not seem that simple to me, but again take this comments with a grain of salt, it's quite hard to follow the flow of things I find here

I just tested 596 minimal example from this comment with this PR and here's now the traceback with wsproto (the same happens with websockets by the way), so the gut feeling I was having earlier about leaving the server in a "bad" state seems revlevant.

❯ uvicorn --ws wsproto dev.596:app
INFO:     Started server process [1802042]
INFO:     Waiting for application startup.
INFO:     ASGI 'lifespan' protocol appears unsupported.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
Go ahead, stop the server...
^CINFO:     Shutting down
INFO:     Waiting for connections to close. (CTRL+C to force quit)
^CINFO:     Finished server process [1802042]
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/protocols/websockets/wsproto_impl.py", line 227, in run_asgi
    result = await self.app(self.scope, self.receive, self.send)
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/middleware/proxy_headers.py", line 75, in __call__
    return await self.app(scope, receive, send)
  File "/home/lotso/PycharmProjects/uvicorn/./dev/596.py", line 10, in app
    await asyncio.sleep(20)
  File "/home/lotso/.asdf/installs/python/3.9.5/lib/python3.9/asyncio/tasks.py", line 654, in sleep
    return await future
asyncio.exceptions.CancelledError
Exception in callback <function handle_http.<locals>.retrieve_exception at 0x7f7c9f7a49d0>
handle: <Handle handle_http.<locals>.retrieve_exception>
Traceback (most recent call last):
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/server.py", line 107, in handler
    await handle_http(
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/_handlers/http.py", line 87, in handle_http
    await connection_lost
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 63, in uvloop.loop.Handle._run
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/_handlers/http.py", line 58, in retrieve_exception
    exc = task.exception()
asyncio.exceptions.CancelledError
Exception in callback UVTransport._call_connection_lost
handle: <Handle UVTransport._call_connection_lost>
Traceback (most recent call last):
  File "uvloop/cbhandles.pyx", line 69, in uvloop.loop.Handle._run
  File "uvloop/handles/basetransport.pyx", line 169, in uvloop.loop.UVBaseTransport._call_connection_lost
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/protocols/websockets/wsproto_impl.py", line 83, in connection_lost
    self.on_connection_lost()
  File "/home/lotso/PycharmProjects/uvicorn/uvicorn/_handlers/http.py", line 42, in <lambda>
    on_connection_lost=lambda: connection_lost.set_result(True),
asyncio.exceptions.InvalidStateError: invalid state

Now to try advance on that, from the stdlib Protocol docstring one can see that

    """Interface for stream protocol.

    The user should implement this interface.  They can inherit from
    this class but don't need to.  The implementations here do
    nothing (they don't raise exceptions).

    When the user wants to requests a transport, they pass a protocol
    factory to a utility function (e.g., EventLoop.create_connection()).

    When the connection is made successfully, connection_made() is
    called with a suitable transport object.  Then data_received()
    will be called 0 or more times with data (bytes) received from the
    transport; finally, connection_lost() will be called exactly once
    with either an exception object or None as an argument.

    State machine of calls:

      start -> CM [-> DR*] [-> ER?] -> CL -> end

    * CM: connection_made()
    * DR: data_received()
    * ER: eof_received()
    * CL: connection_lost()
    """

so maybe one solution would be to catch CancelledError in run_asgi, just thinking out loud,

    async def run_asgi(self):
        """
        Wrapper around the ASGI callable, handling exceptions and unexpected
        termination states.
        """
        try:
            result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
        except asyncio.CancelledError as c:
              # do soething here...
        except BaseException as exc:

@aviramha
Copy link
Contributor Author

aviramha commented Aug 4, 2021

Interesting case. I think that apart from catching the cancelled error, we should move the lifespan shutdown event before waiting on open connections, so if the developer creates a persistent websocket, which means run_asgi should run as long as the connection exists, it'll always abruptly close it rather than have some notification.
wdyt?

@aviramha
Copy link
Contributor Author

@euri10 any further feedback?

@aviramha
Copy link
Contributor Author

I fixed #596

@aviramha
Copy link
Contributor Author

@Kludex, @euri10 any update?

@aviramha
Copy link
Contributor Author

Sorry for being pushy.. but this is quite a problematic issue in production when we shutdown alive websockets.

@Kludex
Copy link
Member

Kludex commented Sep 27, 2021

@aviramha To speed up, would you mind adding tests to cover your changes? I will review it in the next days.

@aviramha
Copy link
Contributor Author

Thanks @Kludex. I will try to add tests tomorrow.

@aviramha
Copy link
Contributor Author

@Kludex Took me some time but I added a test :)

Comment on lines +44 to +45
if not connection_lost.cancelled()
else None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be hit ?

@@ -201,6 +201,11 @@ async def run_asgi(self):
"""
try:
result = await self.app(self.scope, self.asgi_receive, self.asgi_send)
except (concurrent.futures.CancelledError, asyncio.CancelledError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont understand the concurrent.futures.CancelledError here, why isnt asyncio.CancelledError enough ?

@euri10
Copy link
Member

euri10 commented Nov 23, 2021

sorry for the long time @aviramha , if @florimondmanca could also take a lookk that would be awesome, my little brain tends to collapse easily in callbacks

@aviramha
Copy link
Contributor Author

aviramha commented Mar 2, 2022

@euri10 @florimondmanca ?

@tomchristie
Copy link
Member

I want to suggest changing this behavior to only stop serving new connections, without shutting down the [existing] connections.

I don't think that's desirable behaviour, or the model I'd expect with ASGI websockets. If you do this it's not clear to me how they're expected to shutdown.

A good point of reference here would be taking a look at what other ASGI servers do here. In particular eg. what behaviour you see with Django + Daphne serving websockets.

@aviramha
Copy link
Contributor Author

aviramha commented Mar 8, 2022

@tomchristie
Daphne cancels all the tasks which are the running applications (so the application can actually shield itself if it wants to do stuff before dying?) and then twisted dies which calls callbacks for closing all sockets (sending shutdown)
I believe my PR leads to the same behavior - shutdown can trigger cancellations and cancellations are guarded by gracefully closing. This lets the developer guard his code as well (and use the ws in that guard without uvicorn closing it on him)

@caleb15
Copy link

caleb15 commented Jun 15, 2022

@tomchristie following up on aviramha's comment

Comment on lines +204 to +208
except (concurrent.futures.CancelledError, asyncio.CancelledError):
self.logger.error("ASGI callable was cancelled while running")
if not self.handshake_started_event.is_set():
self.send_500_response()
self.transport.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is not needed. This will be part of the BaseException block anyway, just with the fact that it doesn't wait the handshake to complete...

@Kludex
Copy link
Member

Kludex commented Oct 28, 2022

Given that we removed on_connection_lost logic, a rebase here would not be helpful anyway... 😢

I'll be closing this PR because the core logic here is actually on the http/_handlers, and it was removed from the source code.

Extremely sorry for taking this long here.

Also, can someone create an issue with an MRE that this PR tries to solve?

@Kludex Kludex closed this Oct 28, 2022
@Kludex
Copy link
Member

Kludex commented Nov 4, 2022

I want to confirm this idea was solved.

@Kludex Kludex reopened this Nov 4, 2022
@Kludex Kludex added this to the Version 0.21.0 milestone Nov 4, 2022
@Kludex
Copy link
Member

Kludex commented Nov 22, 2022

I believe this is solved.

@Kludex Kludex closed this Nov 22, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handshaking may not be completed yet at shutdown in wsproto impl
6 participants