Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error while closing a websocket #922

Closed
youtux opened this issue Jun 12, 2016 · 15 comments
Closed

Error while closing a websocket #922

youtux opened this issue Jun 12, 2016 · 15 comments
Labels
invalid This doesn't seem right outdated

Comments

@youtux
Copy link

youtux commented Jun 12, 2016

Long story short

I am trying to implement an endpoint that forwards websocket messages between two client. It is necessary that when a client closes the connection, the aiohttp server also close the websocket with the second client.
The problem is my program never get through the await other_ws.close() step.

Expected behaviour

await other_ws.close()  # should close the other connection
stmts                             # should be able to execute code after awaiting

Actual behaviour

await other_ws.close()  # never returns
stmts                             # never gets executed

Furthermore, I get the following error:

Task exception was never retrieved
future: <Task finished coro=<FlowControlDataQueue.read() done, defined at /Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py:589> exception=AssertionError()>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py", line 591, in read
    result = yield from super().read()
  File "/Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py", line 443, in read
    assert not self._waiter
AssertionError

Steps to reproduce

Run the following file:

# aiohttp_issue.py
import asyncio
import aiohttp.web

async def echo_handler(request):
    ws = aiohttp.web.WebSocketResponse()

    # if this is the first ws, I should be killed when other clients disconnect
    if 'other_ws' not in request.app:
        request.app['other_ws'] = ws

    await ws.prepare(request)

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:
            ws.send_str(msg.data)
        else:
            raise NotImplementedError()

    # if there is another ws, kill it.
    print('client disconnected')

    if 'other_ws' in request.app:
        print('killing other_ws')
        await request.app['other_ws'].close()
        print('this line is not going to be printed twice')
        del request.app['other_ws']

    return ws

app = aiohttp.web.Application()
app.router.add_route('GET', '/echo', echo_handler)

aiohttp.web.run_app(app, host='localhost', port=8080)

Now launch two websocket client on that endpoint, and close the connection on the second one (CTRL+D). You can use a CLI client like https://github.com/yhat/ws.

$ ws 'ws://127.0.0.1:8080/echo'
$ ws 'ws://127.0.0.1:8080/echo'                  # CTRL+D after you started it

The python server should show the following error:

$ python3 bug_aiohttp.py
======== Running on http://localhost:8080/ ========
(Press CTRL+C to quit)
client disconnected
killing other_ws
Task exception was never retrieved
future: <Task finished coro=<FlowControlDataQueue.read() done, defined at /Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py:589> exception=AssertionError()>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py", line 591, in read
    result = yield from super().read()
  File "/Users/youtux/.virtualenvs/pebbleremote/lib/python3.5/site-packages/aiohttp/streams.py", line 443, in read
    assert not self._waiter
AssertionError
client disconnected
killing other_ws
this line is not going to be printed twice

Your environment

Mac OS 10.11, python 3.5.1, aiohttp 0.21.6

@mpaolini
Copy link
Contributor

It is forbidden in aiohttp (and in general too) to tamper with another task's websocket while it is being read. What happens (in your example is):

  • task 1 is reading from websocket A
  • task 2 is reading from websocket B
  • websocket B is closed by client
  • task2 invokes .close() on websocket A (this is forbidden)
    • .close() method sends a close message on the websocket (which is delivered maybe)
    • .close() method starts reading from the websocket A to wait for peer acknowledge. At this point the stream is being read by two tasks and the assertion is triggered.

The best way to kill a task is to cancel() it. This is the updated version of your script

# aiohttp_issue.py
import asyncio
import aiohttp.web


async def echo_handler(request):
    ws = aiohttp.web.WebSocketResponse()

    this_task = asyncio.Task.current_task(loop=request.app.loop)
    this_task_id = id(this_task)

    # if this is the first ws, I should be killed when other clients disconnect
    if 'other_ws' not in request.app:
        request.app['other_ws'] = this_task

    await ws.prepare(request)

    try:
        async for msg in ws:
            if msg.tp == aiohttp.MsgType.text:
                ws.send_str(msg.data)
            else:
                raise NotImplementedError()
    except asyncio.CancelledError:
        print('{} cancelled'.format(this_task_id))

    # if there is another ws, kill it.
    print('{} client disconnected'.format(this_task_id))

    if 'other_ws' in request.app and request.app['other_ws'] != this_task:
        print('{} killing other_ws'.format(this_task_id))
        request.app['other_ws'].cancel()
        del request.app['other_ws']

    print('{} before close'.format(this_task_id))
    try:
        await ws.close()
    except asyncio.CancelledError:
        print('{} cancelled error on close'.format(this_task_id))
    else:
        print('{} after close'.format(this_task_id))
    print('{} this line *is* going to be printed twice'.format(this_task_id))
    return ws


def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    app = aiohttp.web.Application(loop=loop)
    app.router.add_route('GET', '/echo', echo_handler)
    aiohttp.web.run_app(app, host='localhost', port=8080)

if __name__ == '__main__':
    main()

@mpaolini
Copy link
Contributor

So basically this should be closed as invalid IMO

@asvetlov
Copy link
Member

@mpaolini thank you so much for investigation.

@asvetlov asvetlov added the invalid This doesn't seem right label Jul 24, 2016
@mpaolini
Copy link
Contributor

@asvetlov reviewing this issue once again: the only way we now have to programmatically close a websocket from the server side is to call .cancel() on the task that reads the websocket. Should we document it? Should we add some tests?

@asvetlov
Copy link
Member

@mpaolini are you asking about termination from other task?
We can introduce a method for this.
Unfortunately .close() cannot be used outside.

@asvetlov asvetlov reopened this Jul 26, 2016
@mpaolini
Copy link
Contributor

@asvetlov yes exactly, termination from other task is the issue. I don't know if we really need a new method or if cancel() is just fine.

@asvetlov
Copy link
Member

You should know a task instance for cancelling.
I think shortcut method for WebSocketResponse could help.
It can grab a current task on .prepare() call and call it's .cancel().
The name for new method may be .cancel() or .terminate()

@mpaolini
Copy link
Contributor

Thinking more about it, all this added complexity is just because we have the "only reading from handler task is allowed" thing.

If that wasn't the case, I could write something like

# aiohttp_issue.py
import asyncio
import aiohttp.web


async def echo(ws):
    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:
            ws.send_str(msg.data)
        else:
            raise NotImplementedError()


async def echo_handler(request):
    ws = aiohttp.web.WebSocketResponse()

    this_task = asyncio.Task.current_task(loop=request.app.loop)
    this_task_id = id(this_task)

    await ws.prepare(request)

    echo_coro = echo(ws)
    wait_kill_coro = request.app['kill'].wait()

    done, pending = await asyncio.wait(
        [
            echo_coro,
            wait_kill_coro
        ],
        return_when=asyncio.FIRST_COMPLETED
    )

    if echo_coro in done:
        print('{} client disconnected'.format(this_task_id))
        print('{} killing other_ws'.format(this_task_id))
        request.app['kill'].set()
    else:
        assert wait_kill_coro in done
        print('{} received kill signal'.format(this_task_id))
        echo_coro.cancel()
    try:
        await ws.close()
    except asyncio.CancelledError:
        print('{} cancelled error on close'.format(this_task_id))
    else:
        print('{} after close'.format(this_task_id))
    print('{} this line *is* going to be printed twice'.format(this_task_id))
    return ws


def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    app = aiohttp.web.Application(loop=loop)
    app.router.add_route('GET', '/echo', echo_handler)
    app['kill'] = asyncio.Event()
    aiohttp.web.run_app(app, host='localhost', port=8080)

if __name__ == '__main__':
    main()

this was my first take at solving this issue but I quickly realized that wait runs echo in a new task, and for some reason this does not work in asyncio and the wait never returns for some reason

Is it really that hard to handle reading in a task that is separate from the handler?

@asvetlov
Copy link
Member

Parallel reads prevention solves philosophical problem. What messages will be consumed by parallel tasks? Random ones!!!
I think it's basically useless behavior.

@mpaolini
Copy link
Contributor

got your point, makes perfect sense. let's do the terminate() on the websocketrespose happen then!

I would do it this way:

    @asyncio.coroutine
    def terminate(self):
        self._terminating = True
        self._cancel_reader_task()

and then in the receive() method

                try:
                    msg = yield from self._reader.read()
                except (asyncio.CancelledError, asyncio.TimeoutError):
                    if self._terminating:
                        return Message(MsgType.close, None, None)
                    raise

@mpaolini
Copy link
Contributor

with the #1004 pr, you can now do

# aiohttp_issue.py
import asyncio
import aiohttp.web


async def echo_handler(request):
    ws = aiohttp.web.WebSocketResponse()
    request.app['websockets'].add(ws)

    this_task_id = id(asyncio.Task.current_task(loop=request.app.loop))

    await ws.prepare(request)

    async for msg in ws:
        if msg.tp == aiohttp.MsgType.text:
            ws.send_str(msg.data)
        else:
            raise NotImplementedError()

    if ws.terminating:
        print('{} ws externally terminated'.format(this_task_id))
    else:
        print('{} client disconnected'.format(this_task_id))
        while request.app['websockets']:
            other_ws = request.app['websockets'].pop()
            if other_ws is not ws:
                print('{} killing other ws'.format(this_task_id))
                other_ws.terminate()

    print('{} before close'.format(this_task_id))
    await ws.close()
    print('{} after close'.format(this_task_id))
    print('{} this line *is* going to be printed twice'.format(this_task_id))
    return ws


def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    app = aiohttp.web.Application(loop=loop)
    app.router.add_route('GET', '/echo', echo_handler)
    app['websockets'] = set()
    aiohttp.web.run_app(app, host='localhost', port=8080)

if __name__ == '__main__':
    main()

@mpaolini
Copy link
Contributor

@asvetlov if the approack in #1004 is correct, I can move forward adding tests and documentation

@mpaolini
Copy link
Contributor

@asvetlov this one we can close (again) I think

@mpaolini
Copy link
Contributor

This is the updated version for this

# aiohttp_issue.py
import asyncio
import aiohttp.web


async def echo_handler(request):
    ws = aiohttp.web.WebSocketResponse()

    this_task = asyncio.Task.current_task(loop=request.app.loop)
    this_task_id = id(this_task)

    # if this is the first ws, I should be killed when other clients disconnect
    if 'other_ws' not in request.app:
        request.app['other_ws'] = this_task

    await ws.prepare(request)

    try:
        async for msg in ws:
            if msg.tp == aiohttp.MsgType.text:
                ws.send_str(msg.data)
            else:
                raise NotImplementedError()
    except asyncio.CancelledError:
        print('{} cancelled'.format(this_task_id))
    else:
        print('{} client disconnected'.format(this_task_id))
        if 'other_ws' in request.app and request.app['other_ws'] != this_task:
            print('{} killing other_ws'.format(this_task_id))
            request.app['other_ws'].cancel()
            del request.app['other_ws']

    if not ws.closed:
        print('{} before close'.format(this_task_id))
        try:
            await ws.close()
        except asyncio.CancelledError:
            print('{} cancelled error on close'.format(this_task_id))
        else:
            print('{} after close'.format(this_task_id))
    else:
        print('{} already closed'.format(this_task_id))
    print('{} this line *is* going to be printed twice'.format(this_task_id))
    return ws


def main():
    loop = asyncio.get_event_loop()
    loop.set_debug(True)
    app = aiohttp.web.Application(loop=loop)
    app.router.add_route('GET', '/echo', echo_handler)
    aiohttp.web.run_app(app, host='localhost', port=8080)

if __name__ == '__main__':
    main()

@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
invalid This doesn't seem right outdated
Projects
None yet
Development

No branches or pull requests

3 participants