-
-
Notifications
You must be signed in to change notification settings - Fork 38
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Problems integrating trio_asyncio & distributed/tornado #22
Comments
I'm on win64 running:
|
That check was removed in: python-trio/trio#610 There might be other problems, but I suspect the immediate roadblock here is gone... |
So @dhirschfeld were there other problems? (too lazy to try this myself) |
Sorry, haven't circled back to this just yet. It's on my TODO list but a pretty long way down at the moment. |
...so, I got curious if this would Just Work now, in particular after #66. Unfortunately I'm running into another error. I first needed to patch the function to pass in the async def f():
async with trio_asyncio.open_loop() as loop:
client = await loop.run_asyncio(partial(Client, loop=loop, asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future) ...but that then resulted in the below error: In [3]: trio.run(f)
Traceback (most recent call last):
File "<ipython-input-3-03c750b89f65>", line 1, in <module>
trio.run(f)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\trio\_core\_run.py", line 1804, in run
raise runner.main_task_outcome.error
File "<ipython-input-2-27c49da4404c>", line 3, in f
client = await loop.run_asyncio(partial(Client, loop=loop, asynchronous=True))
File "c:\users\dhirschf\code\github\python-trio\trio-asyncio\trio_asyncio\adapter.py", line 82, in __await__
f = f(*self.args)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\client.py", line 726, in __init__
self.start(timeout=timeout)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\client.py", line 891, in start
sync(self.loop, self._start, **kwargs)
File "C:\Users\dhirschf\envs\dev\lib\site-packages\distributed\utils.py", line 336, in sync
loop.add_callback(f)
AttributeError: 'TrioEventLoop' object has no attribute 'add_callback' Does that mean that |
There is no |
Thx @smurfix - sounds like I'll have to take it up with |
Maybe you need to somehow wrap the asyncio loop object in a tornado loop
object, and pass that to distributed?
…On Fri, Feb 14, 2020, 04:52 Dave Hirschfeld ***@***.***> wrote:
Thx @smurfix <https://github.com/smurfix> - sounds like I'll have to take
it up with distributed...
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#22?email_source=notifications&email_token=AAEU42CF77W7RGWK5S7N7G3RC2HX7A5CNFSM4FH2JAYKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOELY5THQ#issuecomment-586275230>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAEU42G2X3RTBCLNVH2NQK3RC2HX7ANCNFSM4FH2JAYA>
.
|
Thanks for the suggestion @njsmith - I'll give that a go! |
Just for informational purposes (in case anyone is interested), explicitly passing the loop was unnecessary but I run into a problem with different loops being used: async def f():
async with trio_asyncio.open_loop() as loop:
print(id(loop))
print(id(IOLoop.current().asyncio_loop))
client = await loop.run_asyncio(partial(Client, loop=IOLoop.current(), asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future) In [25]: await f()
1656317099400
1656317099400
Traceback (most recent call last):
File "<ipython-input-25-9fe41d264da1>", line 4, in async-def-wrapper
File "<ipython-input-23-d02282e26f84>", line 5, in f
client = await loop.run_asyncio(partial(Client, loop=IOLoop.current(), asynchronous=True))
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\base.py", line 231, in run_aio_coroutine
return await run_aio_future(coro)
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\util.py", line 39, in run_aio_future
res = await trio.hazmat.wait_task_rescheduled(abort_cb)
File "~\envs\dev\lib\site-packages\trio\_core\_traps.py", line 165, in wait_task_rescheduled
return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
File "~\envs\dev\lib\site-packages\outcome\_sync.py", line 111, in unwrap
raise captured_error
File "~\envs\dev\lib\asyncio\tasks.py", line 630, in _wrap_awaitable
return (yield from awaitable.__await__())
File "~\envs\dev\lib\site-packages\distributed\client.py", line 957, in _start
**self._startup_kwargs
File "~\envs\dev\lib\site-packages\distributed\deploy\spec.py", line 365, in _
await self._start()
File "~\envs\dev\lib\site-packages\distributed\deploy\spec.py", line 290, in _start
await super()._start()
File "~\envs\dev\lib\site-packages\distributed\deploy\cluster.py", line 57, in _start
comm = await self.scheduler_comm.live_comm()
File "~\envs\dev\lib\site-packages\distributed\core.py", line 646, in live_comm
connection_args=self.connection_args,
File "~\envs\dev\lib\site-packages\distributed\comm\core.py", line 214, in connect
future, timeout=min(deadline - time(), 1)
File "~\envs\dev\lib\asyncio\tasks.py", line 442, in wait_for
return fut.result()
File "~\envs\dev\lib\site-packages\distributed\comm\tcp.py", line 349, in connect
ip, port, max_buffer_size=MAX_BUFFER_SIZE, **kwargs
File "~\envs\dev\lib\site-packages\tornado\tcpclient.py", line 270, in connect
addrinfo = await self.resolver.resolve(host, port, af)
RuntimeError:
Task <Task pending coro=<BaseTCPConnector.connect() running at ~\envs\dev\lib\site-packages\distributed\comm\tcp.py:349>
cb=[_release_waiter(<Future pendi...1A4D379D8>()]>)() at ~\envs\dev\lib\asyncio\tasks.py:392]>
got Future <Future pending> attached to a different loop |
Removing IPython from the equation (by running it as a script) results in a different error: In [3]: from dask.distributed import LocalCluster
In [5]: cluster = LocalCluster(n_workers=1, threads_per_worker=2)
In [6]: cluster.scheduler.address
Out[6]: 'tcp://127.0.0.1:60590' import asyncio
from functools import partial
from tornado.ioloop import IOLoop
import trio
import trio_asyncio
from dask.distributed import Client
address = 'tcp://127.0.0.1:60590'
async def f():
async with trio_asyncio.open_loop() as loop:
client = await loop.run_asyncio(partial(Client, address, asynchronous=True))
future = client.submit(lambda x: x + 1, 10)
return await loop.run_future(future)
res = trio_asyncio.run(f)
print(res) (dev) ~\code\sandbox> python .\test-trio-dask.py
distributed.client - ERROR - Error in callback <function run_aio_future.<locals>.done_cb at 0x0000019721A32DC8> of <Future: finished, type: builtins.int, key: lambda-845252994edca236a111f8f502902dff>:
Traceback (most recent call last):
File "~\envs\dev\lib\site-packages\trio\_core\_generated_run.py", line 96, in reschedule
return GLOBAL_RUN_CONTEXT.runner.reschedule(task, next_send)
AttributeError: '_thread._local' object has no attribute 'runner'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "~\envs\dev\lib\site-packages\distributed\client.py", line 287, in execute_callback
fn(fut)
File "~\code\github\python-trio\trio-asyncio\trio_asyncio\util.py", line 25, in done_cb
trio.hazmat.reschedule(task, outcome.capture(future.result))
File "~\envs\dev\lib\site-packages\trio\_core\_generated_run.py", line 98, in reschedule
raise RuntimeError('must be called from async context')
RuntimeError: must be called from async context |
Unless the traceback highlights some glaringly obvious bug in |
Ugh. I'm not running any sort of dask server, so when I thy I get this:
This error message does not make sense, either the connection was refused (which implies that the server rejected us) or it timed out (which implies that the server didn't react at all). I filed a bug at dask/distributed#3487 Anyway, if I needed a dask client I would tend to fork the thing and apply a global s/asyncio/trio/g – but as I would like to understand why this doesn't work, please tell me how you started the server part. |
Sorry - I should've been clearer above. Dask has a from dask.distributed import LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=2)
print(cluster.scheduler.address) was run by me in a separate python session to start a cluster and get the address. I did it in a separate process to avoid any contamination of the testing code. I've seen Anyway, having started the dask cluster you would then need to change the
Yeah, I'm at the point of wondering if I need to start a |
At least seems potentially feasible:
|
OK, I've looked at the code a bit, particularly that of Tornado. Bottom line: I do think it's possible to teach Tornado to be compatible with trio-asyncio, but it would result in something rather fragile at best. (Its multi-loop support doesn't help. At all. Neither does its Python 2.7 compatibility …) If you don't want to write a My preferred solution would be |
Thanks for taking a look @smurfix, and for the helpful suggestions!
That is sounding more and more like the way to go... |
Moved from python-trio/trio#552 (comment)
results in the error:
The text was updated successfully, but these errors were encountered: