Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot call AsyncToSync twice in one sync context for channels_redis #859

Closed
ahaltindis opened this issue Feb 5, 2018 · 42 comments
Closed

Comments

@ahaltindis
Copy link

ahaltindis commented Feb 5, 2018

I am using a SyncConsumer which adds itself to a group on websocket.connect with group_add method of channel_layer.

class TaskConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.accept()
        AsyncToSync(self.channel_layer.group_add)('task-i-1', self.channel_name)

    def task_message(self, event):
        self.send_json(event["text"])

If I try to send message to this group using group_send method that is wrapped with AsyncToSync, first message is succeed but further messages throw this exception:

>>> c = get_channel_layer()
>>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}})
>>> AsyncToSync(c.group_send)('task-i-1', {'type': 'task.message', 'text':{}})
Connection <RedisConnection [db:0]> has pending commands, closing it.
Traceback (most recent call last):
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel
s_redis/core.py", line 316, in group_send
await connection.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry)
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /home/ahmet/webso
cket_channel/env/lib64/python3.6/site-packages/asgiref/sync.py:57> cb=[_run_until_complete_cb()
at /usr/lib64/python3.6/asyncio/base_events.py:176]> got Future attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "", line 1, in
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref
/sync.py", line 49, in call
call_result.result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 425, in result
return self.__get_result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/asgiref
/sync.py", line 57, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/channel
s_redis/core.py", line 320, in group_send
await connection.zrange(key, 0, -1)
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi
s/commands/init.py", line 152, in exit
self._release_callback(conn)
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi
s/pool.py", line 361, in release
conn.close()
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi
s/connection.py", line 352, in close
self._do_close(ConnectionForcedCloseError())
File "/home/ahmet/websocket_channel/env/lib64/python3.6/site-packages/aioredi
s/connection.py", line 359, in _do_close
self._writer.transport.close()
File "/usr/lib64/python3.6/asyncio/selector_events.py", line 621, in close
self._loop.call_soon(self._call_connection_lost, None)
File "/usr/lib64/python3.6/asyncio/base_events.py", line 574, in call_soon
self._check_closed()
File "/usr/lib64/python3.6/asyncio/base_events.py", line 357, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

  • OpenSuse Leap(42.2)
  • python[3.6.4], django[2.0.2], channels[2.0.0], daphne[2.0.2], channels-redis[2.0.2]. asgiref[2.1.3]
  • Django running with runserver
@andrewgodwin
Copy link
Member

Oh, yes, right, the connection caching code in channels_redis isn't going to like the way AsyncToSync works. This is a bug, but you can workaround it for now by using AsyncToSync in a class context:

    ...
    async def async_group_add(self, *args, **kwargs):
        self.channel_layer.group_add(*args, **kwargs)
    group_add = AsyncToSync(async_group_add)

    def connect(self):
        self.group_add(...)

@andrewgodwin andrewgodwin changed the title RuntimeError: Event loop is closed Cannot call AsyncToSync twice in one sync context Feb 6, 2018
@ahaltindis
Copy link
Author

Thank you for the workaround solution but now it says You cannot instantiate AysncToSync inside a thread that wasn't made using SyncToAsync

I am not so familiar with asyncio, sorry if I am missing something obvious :)

@andrewgodwin
Copy link
Member

Async programming is Very Hard to get right. I won't be able to help with workarounds if that didn't work, let me focus on getting this working this week so it's rock-solid and maybe write up a document on how the hell async stuff works as you need a basic understanding of it to debug stuff.

@AlexejStukov
Copy link

@ahaltindis This blog post might help you.

@ahaltindis
Copy link
Author

@andrewgodwin I didn't expect any workaround in first place but appreciated very much that you gave me anyhow. Second time just wanted you and others who may suffer same problem to know that this also didn't work. Looking forward to rock-solid version and the document. Thanks again for your great effort to community.

@AlexejStukov thank you, I will check.

@Mykyta-Chernenko
Copy link

2.0.2 fixed my relevant issue. Probably this is gone as well

@bastbnl
Copy link
Contributor

bastbnl commented Feb 8, 2018

Edit: updated stacktrace to include more detail and added celery startup cli

2.0.2 fixed my relevant issue. Probably this is gone as well

I'm still seeing the same thing, unfortunately:

[2018-02-09 21:21:04,819: WARNING/ForkPoolWorker-2] Connection <RedisConnection [db:0]> has pending commands, closing it.
[2018-02-09 21:21:04,874: ERROR/ForkPoolWorker-2] Task session.notify[bc18fa20-48c5-4c53-a421-fd69f30f859a] raised unexpected: RuntimeError('Event loop is closed',)
Traceback (most recent call last):
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 316, in group_send
    await connection.zremrangebyscore(key, min=0, max=int(time.time()) - self.group_expiry)
  File "/usr/lib/python3.5/asyncio/futures.py", line 380, in __iter__
    yield self  # This tells Task to wait for completion.
RuntimeError: Task <Task pending coro=<AsyncToSync.main_wrap() running at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py:63> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:176]> got Future <Future pending> attached to a different loop

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/var/www/brownpapersession/dev/brownpapersession-py35/session/tasks.py", line 586, in notify_clients
    async_to_sync(channel_layer.group_send)(a_channel_connection, {"type": "websocket.send", "text": dumps(message_payload), })
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 49, in __call__
    return call_result.result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "/usr/lib/python3.5/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 63, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 320, in group_send
    await connection.zrange(key, 0, -1)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/commands/__init__.py", line 152, in __exit__
    self._release_callback(conn)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/pool.py", line 361, in release
    conn.close()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 352, in close
    self._do_close(ConnectionForcedCloseError())
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 359, in _do_close
    self._writer.transport.close()
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 622, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 572, in call_soon
    self._check_closed()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Using python 3.5.3 and updated stack containing channels==2.0.2, channels-redis==2.0.2, aioredis==1.0.0 and asgiref==2.1.4

celery is started in a screen session using celery -A brownpapersession worker -B -l info

@andrewgodwin
Copy link
Member

Right, 2.0.2 will have fixed the issue for some as it removes some threading, but the basic issue still remains, and I'll look into it soon.

@andrewgodwin andrewgodwin changed the title Cannot call AsyncToSync twice in one sync context Cannot call AsyncToSync twice in one sync context for channels_redis Feb 8, 2018
@andrewgodwin
Copy link
Member

Well it looks like the issue might have been in asgiref after all, and this commit fixes it for me locally: django/asgiref@229027b

Could affected people pull down and check the master branch of asgiref and see if it fixes it for them too? If so, I can close this ticket and do a release.

@ghost
Copy link

ghost commented Feb 12, 2018

Still facing the same issue. Group messages get sent properly the first time (sometimes 2 or 3 times) and then raise a runtime error.

If it helps, here's the traceback.

@andrewgodwin
Copy link
Member

Hmm, that appears to be a different issue then. Let me try and tackle that too.

@ahaltindis
Copy link
Author

No, still same exception. I used same block code for consumer and triggered in a same way. It always gets failed in the second time. Traceback

I am using this workaround for now instead of AsyncToSync. I am not sure if it is suitable for every case or elegant solution.

loop = asyncio.get_event_loop()
coroutine = channel_layer.group_send(
    group_name,
    {
        'type': 'task.message',
        'text': context
    })
loop.run_until_complete(coroutine)

@quantumlink
Copy link

I don't know if it's relevant but sometimes I get RuntimeError: There is no current event loop in thread 'Thread-2'. (the actually thread number changes) when I use the above, and it happens on loop = asyncio.get_event_loop()

I'm using git+git://github.com/django/asgiref@master#egg=asgiref and git+git://github.com/django/daphne@master#egg=daphne` as package requirements to ensure I'm on latest.

@andrewgodwin
Copy link
Member

Alright. I have a day to look at this today so hopefully I can get it reliably replicated and then solved.

@andrewgodwin
Copy link
Member

Could you post a full traceback of your RuntimeError, @quantumlink? The more data I have the better.

@andrewgodwin
Copy link
Member

Ah, if you are referring to the workaround then yes, that is an expected error, and why we have AsyncToSync to try and solve those issues :)

@quantumlink
Copy link

quantumlink commented Feb 14, 2018

RuntimeError: There is no current event loop in thread 'Thread-2'.
  File "example/models.py", line 434, in group_send
    loop = asyncio.get_event_loop()
  File "asyncio/events.py", line 671, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "asyncio/events.py", line 583, in get_event_loop
    % threading.current_thread().name)

I am still looking into it but it seems that when I call this from the shell I don't seem to get the error ever. It only seems to occasionally happen when it happens in a request. It's just one test from one machine though so if I get other results I'll update here.

@bastbnl
Copy link
Contributor

bastbnl commented Feb 14, 2018

The workaround produced by @ahaltindis appears to resolve the issue. I'm not getting that error anymore.

On a sidenode: channels 2 appears to be a lot slower than version 1. I'm running on a single core VPS and I can see a delay in the frontend while work is being processed by celery. Here's the pattern I'm seeing with asyncio debug enabled:

[2018-02-14 19:53:26,394: DEBUG/ForkPoolWorker-2] poll took 0.011 ms: 1 events
[2018-02-14 19:53:26,400: DEBUG/ForkPoolWorker-2] poll took 0.011 ms: 1 events
[2018-02-14 19:53:26,402: DEBUG/ForkPoolWorker-2] poll took 0.009 ms: 1 events
[2018-02-14 19:53:26,408: DEBUG/ForkPoolWorker-2] Get address info localhost:6379, type=<SocketKind.SOCK_STREAM: 1>
[2018-02-14 19:53:26,408: DEBUG/ForkPoolWorker-2] Getting address info localhost:6379, type=<SocketKind.SOCK_STREAM: 1> took 0.358 ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))]
[2018-02-14 19:53:26,412: DEBUG/ForkPoolWorker-2] connect <socket.socket fd=25, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('0.0.0.0', 0)> to ('127.0.0.1', 6379)
[2018-02-14 19:53:26,414: DEBUG/ForkPoolWorker-2] poll took 0.008 ms: 1 events
[2018-02-14 19:53:26,417: DEBUG/ForkPoolWorker-2] <socket.socket fd=25, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('127.0.0.1', 32826), raddr=('127.0.0.1', 6379)> connected to localhost:6379: (<_SelectorSocketTransport fd=25 read=polling write=<idle, bufsize=0>>, <asyncio.streams.StreamReaderProtocol object at 0x7f91392a7b00>)
[2018-02-14 19:53:26,421: DEBUG/ForkPoolWorker-2] poll took 0.009 ms: 1 events
[2018-02-14 19:53:26,429: DEBUG/ForkPoolWorker-2] poll took 0.012 ms: 1 events
[2018-02-14 19:53:26,432: DEBUG/ForkPoolWorker-2] poll took 0.013 ms: 1 events
[2018-02-14 19:53:26,438: DEBUG/ForkPoolWorker-2] Get address info localhost:6379, type=<SocketKind.SOCK_STREAM: 1>
[2018-02-14 19:53:26,438: DEBUG/ForkPoolWorker-2] Getting address info localhost:6379, type=<SocketKind.SOCK_STREAM: 1> took 0.129 ms: [(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 6379))]
[2018-02-14 19:53:26,442: DEBUG/ForkPoolWorker-2] connect <socket.socket fd=25, family=AddressFamily.AF_INET, type=2049, proto=6, laddr=('0.0.0.0', 0)> to ('127.0.0.1', 6379)

@andrewgodwin
Copy link
Member

Alright, I am reasonably sure I have fixed the other half of this in django/channels_redis@f5e4799 - would appreciate people confirming things are fine for them (I can no longer make it crash).

@andrewgodwin
Copy link
Member

@bastbnl Can we move discussion of speed to another issue? I'd rather not clutter this one up any more.

@bastbnl
Copy link
Contributor

bastbnl commented Feb 14, 2018

Preliminary tests looking really good @andrewgodwin

@bastbnl
Copy link
Contributor

bastbnl commented Feb 14, 2018

Hmmm

    super(AsynPool, self).__init__(processes, *args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 1007, in __init__
    self._create_worker_process(i)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/concurrency/asynpool.py", line 439, in _create_worker_process
    return super(AsynPool, self)._create_worker_process(i)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 1116, in _create_worker_process
    w.start()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 124, in start
    self._popen = self._Popen(self)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/context.py", line 333, in _Popen
    return Popen(process_obj)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/popen_fork.py", line 24, in __init__
    self._launch(process_obj)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/popen_fork.py", line 79, in _launch
    code = process_obj._bootstrap()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 327, in _bootstrap
    self.run()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 289, in __call__
    sys.exit(self.workloop(pid=pid))
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 537, in _fast_trace_task
    uuid, args, kwargs, request,
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/var/www/brownpapersession/dev/brownpapersession/session/tasks.py", line 588, in notify_clients
    group_send(a_channel_connection, {"type": "websocket.send", "text": dumps(message_payload), })
  File "/var/www/brownpapersession/dev/brownpapersession/various/legacy.py", line 56, in group_send
    async_to_sync(layer.group_send)(group_label, payload)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 37, in __call__
    loop.run_until_complete(self.main_wrap(args, kwargs, call_result))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 1416, in _run_once
    handle._run()
  File "/usr/lib/python3.5/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 315, in _wakeup
    self._step()
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 63, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 315, in group_send
    pool = await self.connection(self.consistent_hash(group))
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 386, in connection
    self.pools[index] = await aioredis.create_redis_pool(**self.hosts[index])
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/commands/__init__.py", line 197, in create_redis_pool
    loop=loop)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/pool.py", line 56, in create_pool
    await pool._fill_free(override_min=False)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/pool.py", line 388, in _fill_free
    conn = await self._create_new_connection(address)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 124, in create_connection
    loop=loop)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 158, in __init__
    loop=self._loop)
task: <Task pending coro=<RedisConnection._read_data() done, defined at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[Task._wakeup()] created at /usr/lib/python3.5/asyncio/base_events.py:275> cb=[Future.set_result()] created at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py:158>

@quantumlink
Copy link

Things seem very much resolved from a few different ways I've tested. Locally in docker, then hosted with two daphne's and a proxy in front. So far things look resolved. Thanks a bunch @andrewgodwin

@quantumlink
Copy link

quantumlink commented Feb 14, 2018

Seeing this.

RuntimeError: Event loop is closed
  File "example/models.py", line 120, in group_send
    "type": "chat.message"
  File "asgiref/sync.py", line 62, in __call__
    return call_result.result()
  File "concurrent/futures/_base.py", line 398, in result
    return self.__get_result()
  File "concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "asgiref/sync.py", line 76, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "channels_redis/core.py", line 314, in group_send
    with (await pool) as connection:
  File "aioredis/commands/__init__.py", line 129, in __await__
    conn = yield from self._pool_or_conn.acquire().__await__()
  File "aioredis/pool.py", line 329, in acquire
    await self._fill_free(override_min=True)
  File "aioredis/pool.py", line 388, in _fill_free
    conn = await self._create_new_connection(address)
  File "aioredis/connection.py", line 107, in create_connection
    timeout, loop=loop)
  File "asyncio/tasks.py", line 381, in wait_for
    return (yield from fut)
  File "aioredis/stream.py", line 19, in open_connection
    lambda: protocol, host, port, **kwds)
  File "asyncio/base_events.py", line 719, in create_connection
    flags=flags, loop=self)
  File "asyncio/base_events.py", line 173, in _ensure_resolved
    proto=proto, flags=flags)
  File "asyncio/base_events.py", line 673, in getaddrinfo
    host, port, family, type, proto, flags)
  File "asyncio/base_events.py", line 628, in run_in_executor
    self._check_closed()
  File "asyncio/base_events.py", line 357, in _check_closed
    raise RuntimeError('Event loop is closed')

and also

RuntimeError: There is no current event loop in thread 'Thread-1'.
  File "asgiref/sync.py", line 19, in __init__
    self.main_event_loop = asyncio.get_event_loop()
  File "asyncio/events.py", line 671, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "asyncio/events.py", line 583, in get_event_loop
    % threading.current_thread().name)

RuntimeError: You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync
  File "example/models.py", line 117, in group_send
    async_to_sync(channel_layer.group_send)(
  File "asgiref/sync.py", line 26, in __init__
    "You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync"

@andrewgodwin
Copy link
Member

@bastbnl I'm not sure what you posted - it looks like an incomplete traceback? There's no error on it.

@quantumlink Can I get an example of the code that is causing this error? I need to be able to replicate it to fix it, and the easiest thing for that is to see where and how you're calling it.

@quantumlink
Copy link

In a model I have a method with this wrapped in a try/except RuntimeError

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
                    channel_name, {
                        "text": json.dumps(payload),
                        "type": "chat.message"
                    }
                )

and then in a WebsocketConsumer I have

    def chat_message(self, event):
        self.send(event['text'])

I seem to be getting a great deal of the websockets, but some result in the errors above.

@bastbnl
Copy link
Contributor

bastbnl commented Feb 14, 2018

Yes, it's incomplete as it's produced by celery running in a screen session, which means I can't scroll back. I started it in a different SSH session and ran into this error message - everythin appears to be working fine though

[2018-02-14 20:58:47,813: ERROR/ForkPoolWorker-2] Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() done, defined at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[Task._wakeup()]> cb=[Future.set_result()]>

I'll try to reproduce the other error as well, or find it in the logfiles (edit: got it)

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "/var/www/brownpapersession/dev/env/bin/celery", line 11, in <module>
    load_entry_point('celery==4.1.0', 'console_scripts', 'celery')()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/__main__.py", line 14, in main
    _main()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/celery.py", line 326, in main
    cmd.execute_from_commandline(argv)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/celery.py", line 488, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/base.py", line 281, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/celery.py", line 480, in handle_argv
    return self.execute(command, argv)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/celery.py", line 412, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/worker.py", line 221, in run_from_argv
    return self(*args, **options)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/base.py", line 244, in __call__
    ret = self.run(*args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bin/worker.py", line 256, in run
    worker.start()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/bootsteps.py", line 370, in start
    return self.obj.start()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/concurrency/base.py", line 131, in start
    self.on_start()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/concurrency/prefork.py", line 112, in on_start
    **self.options)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/concurrency/asynpool.py", line 422, in __init__
    super(AsynPool, self).__init__(processes, *args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 1007, in __init__
    self._create_worker_process(i)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/concurrency/asynpool.py", line 439, in _create_worker_process
    return super(AsynPool, self)._create_worker_process(i)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 1116, in _create_worker_process
    w.start()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 124, in start
    self._popen = self._Popen(self)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/context.py", line 333, in _Popen
    return Popen(process_obj)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/popen_fork.py", line 24, in __init__
    self._launch(process_obj)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/popen_fork.py", line 79, in _launch
    code = process_obj._bootstrap()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 327, in _bootstrap
    self.run()
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 289, in __call__
    sys.exit(self.workloop(pid=pid))
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/billiard/pool.py", line 358, in workloop
    result = (True, prepare_result(fun(*args, **kwargs)))
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 537, in _fast_trace_task
    uuid, args, kwargs, request,
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/var/www/brownpapersession/dev/brownpapersession/session/tasks.py", line 588, in notify_clients
    group_send(a_channel_connection, {"type": "websocket.send", "text": dumps(message_payload), })
  File "/var/www/brownpapersession/dev/brownpapersession/various/legacy.py", line 56, in group_send
    async_to_sync(layer.group_send)(group_label, payload)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 37, in __call__
    loop.run_until_complete(self.main_wrap(args, kwargs, call_result))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 1416, in _run_once
    handle._run()
  File "/usr/lib/python3.5/asyncio/events.py", line 126, in _run
    self._callback(*self._args)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 315, in _wakeup
    self._step()
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/asgiref/sync.py", line 63, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 315, in group_send
    pool = await self.connection(self.consistent_hash(group))
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/channels_redis/core.py", line 386, in connection
    self.pools[index] = await aioredis.create_redis_pool(**self.hosts[index])
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/commands/__init__.py", line 197, in create_redis_pool
    loop=loop)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/pool.py", line 56, in create_pool
    await pool._fill_free(override_min=False)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/pool.py", line 388, in _fill_free
    conn = await self._create_new_connection(address)
  File "/usr/lib/python3.5/asyncio/coroutines.py", line 109, in __next__
    return self.gen.send(None)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 124, in create_connection
    loop=loop)
  File "/var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py", line 158, in __init__
    loop=self._loop)
task: <Task pending coro=<RedisConnection._read_data() done, defined at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[Task._wakeup()] created at /usr/lib/python3.5/asyncio/base_events.py:275> cb=[Future.set_result()] created at /var/www/brownpapersession/dev/env/lib/python3.5/site-packages/aioredis/connection.py:158>

@andrewgodwin
Copy link
Member

@bastbnl That appears to be a problem inside the aioredis library and how it handles connections; it's not harmful, but I am trying to trace it down separately. I won't be able to fix it as part of this ticket.

@quantumlink So you're saying that this code works most of the time, but then fails occasionally? Can you give me some idea of the amount of hits it's getting and the failure rate in %?

@quantumlink
Copy link

quantumlink commented Feb 14, 2018

I think one of my boxes wasn't updating to the latest master release for some reason, after forcing it seems that everything works right.

Edit: Apparently I'm still getting RuntimeError: You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync sometimes but not consistently, but I feel like has to be asgiref not updating correctly. I'll see if I can confirm that.

@andrewgodwin
Copy link
Member

Ah, alright. The error you mention is deliberate if you're somehow calling AsyncToSync from something threaded (so if you have a function that's calling group add that might be in a threaded context), as making it work correctly inside threads is more of a nightmare and one I'm not willing to tackle at this moment.

I'm going to close this out as fixed, then, and ship new releases of all the packages involved so everyone can get it.

@ghost
Copy link

ghost commented Feb 15, 2018

Bug seems to be fixed for me.

channels-redis 2.0.3 is not yet available in pypi though.

@andrewgodwin
Copy link
Member

Should be releasing itself now, I had forgotten to tag it.

@ghost
Copy link

ghost commented Feb 15, 2018

Not sure if it's related to this, but there's a new error when I send group messages from celery,

[2018-02-15 01:06:06,970: ERROR/ForkPoolWorker-5] Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() done, defined at /home/frosty/Desktop/hackerschat/venv/hackerschat/lib/python3.5/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[Task._wakeup()]> cb=[Future.set_result()]>

Messages are being sent properly, but I see multiple such exceptions in the terminal.

Edit : The aioredis issue seems to have been mentioned above. If there's a related issue where I can keep up with the developments, let me know.

Edit 2 : This appears to be the relevant issue django/channels_redis#71

@last-partizan
Copy link

I get this warning now too.

Here is my code:

class OrderMonitor(JsonWebsocketConsumer):

    @classmethod
    def notify(cls):
        args = "test_grp", {
            "type": "order.updated",
            "updated": "1234",
        }
        async_to_sync(get_channel_layer().group_send)(*args)

    def order_updated(self, event):
        self.send_json({
            "updated": event["updated"],
        })

and i call this from ipython

for x in range(10): OrderMonitor.notify()

and get 10 messages in ipython console:

Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() running at /home/serg/work/multisend/venv/lib/python3.6/site-packages/aioredis/connection.py:181> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f93430c6d38>()]> cb=[Future.set_result()]>

@last-partizan
Copy link

Just found even simpler way to trigger this

In [4]: async_to_sync(get_channel_layer().group_send)("test", {})

In [5]:                                                                                                                        
Do you really want to exit ([y]/n)? 
Task was destroyed but it is pending!
task: <Task pending coro=<RedisConnection._read_data() done, defined at /home/serg/work/multisend/venv/lib/python3.6/site-packages/aioredis/connection.py:175> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fe51d946d98>()]> cb=[Future.set_result()]>

@andrewgodwin
Copy link
Member

Yes, that warning happens basically every time. I'm looking into it separately. It doesn't affect anything negatively, however.

@bruecksen
Copy link

@andrewgodwin I'm still seeing the "You cannot instantiate AsyncToSync inside a thread that wasn't made using SyncToAsync" error. I'm using the latest master versions of channels and channels-redis and it works the first time but if I try it a second time I get this error. I'm using async_to_sync() in the post_save signal to push new notifications count. Any ideas?

@andrewgodwin
Copy link
Member

@bruecksen That error string is not even in the latest version of asgiref, so I presume you haven't updated that.

@bruecksen
Copy link

@andrewgodwin yes you are right, I forgot to update asgiref as well. Thanks!

@williams9438
Copy link

I don't know if it's relevant but sometimes I get RuntimeError: There is no current event loop in thread 'Thread-2'. (the actually thread number changes) when I use the above, and it happens on loop = asyncio.get_event_loop()

I'm using git+git://github.com/django/asgiref@master#egg=asgiref and git+git://github.com/django/daphne@master#egg=daphne` as package requirements to ensure I'm on latest.

yes i get this too when triggering the broadcast to a group channel using celery periodic task but it doesn't throw this error when i fire the task from the django shell . please what is the workaround for this??

@williams9438
Copy link

williams9438 commented Sep 2, 2020

i was able to solve this by explicitly created it As it says, there is no current event loop in a new thread.

referenced from tornadoweb/tornado#2308 (comment)

full working code should look like

        try:
            # Send message to room group
            asyncio.set_event_loop(asyncio.new_event_loop())
            loop = asyncio.get_event_loop()
            channel_layer = get_channel_layer()
            coroutine = channel_layer.group_send(
            'channel_group',
            {
                'type': 'task_message',
                'message': 'text'
            })
            loop.run_until_complete(coroutine)
        except Exception as e:
            print(e)

@Elabbasy00
Copy link

Elabbasy00 commented Oct 29, 2022

shell asgiref==3.5.2 channels==4.0.0 channels-redis==4.0.0

Task exception was never retrieved
future: <Task finished name='Task-378' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-381' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-382' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop

sometimes I got

Task exception was never retrieved
future: <Task finished name='Task-348' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 338, in close
    return self._transport.close()
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Task exception was never retrieved
future: <Task finished name='Task-349' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 338, in close
    return self._transport.close()
  File "/usr/lib/python3.10/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 750, in call_soon
    self._check_closed()
  File "/usr/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants