-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Client.get_dataset to always create Futures attached to itself #3729
Conversation
crusaderky
commented
Apr 20, 2020
- Fix error "Inputs contain futures that were created by another client"
- Closes Published Future retrieved from async client is synchronous #3227
- Closes Unable to use published datasets in a different client #2336
This reverts commit 5d2566c.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @crusaderky, I've left a few comments.
distributed/client.py
Outdated
@@ -90,6 +91,7 @@ | |||
_global_clients = weakref.WeakValueDictionary() | |||
_global_client_index = [0] | |||
|
|||
_deserialize_client = ContextVar("_deserialize_client", default=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to use contextvars here? We currently use a thread local for storing these kind of things, but that doesn't always play nicely with async. Is there a way we could refactor part of distributed to avoid passing around this implicit state?
In general I thinkcontextvars
can be quite useful where required, but want to ensure it's needed here before bringing it in. We also already have a fair bit of implicit state, I'm a bit hesitant to add more if we can avoid it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid using contextvars you need to explicitly propagate the client until you stop calling async functions and just call sync functions. In this case, this means that
client.Client._get_dataset
needs to passself
tocore.send_recv
, which in turn needs to pass it tocomm.tcp.TCP.read
(plus its variantscomm.inproc.InProc.read
and the 100% untestedcomm.ucx.UCX.read
), which in turn needs to pass it tocomm.utils.from_frames
, which can finally put a context manager (setting a thread-local variable) around
distributed/distributed/comm/utils.py
Lines 60 to 62 in 8376f22
return protocol.loads( frames, deserialize=deserialize, deserializers=deserializers ) - which will alter the behaviour of
client.Future.__getstate__
and it should be really self-evident why I don't recommend doing it.
We currently use a thread local for storing these kind of things
For this one you were using a global, client._global_clients
.
contextvars exactly fits the use case of thread locals when you use coroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. Should this logic replace how we access implicit clients everywhere? If so, perhaps we should rename the context var key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there other cases where a method of Client loses self
along the way and re-acquires it later from a global variable?
Also note the very big fat caveat of the race conditions on Python 3.6. Not sure how much of the dask.distributed user base still uses Python 3.6 AND has more than one client running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, let me clarify.
It feels a bit weird to handle setting the contextvar in the _get_dataset
method. It might be cleaner to have a parent context over all coroutines started by a client that includes a reference back to the client. As it stands this is baking in functionality to Future
that is only used by one method, when we likely want this everywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be cleaner to have a parent context over all coroutines started by a client that includes a reference back to the client.
Are you talking to some sort of blanket decorator applied around all methods of Client? If so, it feels a bit overkill to me, as I do not know of another method that needs it.
As it stands this is baking in functionality to Future that is only used by one method, when we likely want this everywhere.
You definitely want to reuse this same variable every time you have a self
reference to a Client instance, invoke a method, lose the reference (because it would be too cumbersome to propagated by hand), and then you need to reacquire it deeper down the line before the end of the method. If you can point me to any other function that does this, I'll be happy to look into it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jcrist ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm redesigning the change to make it more generic, hold on
|
||
|
||
@gen_cluster(client=True) | ||
async def test_deserialize_client(c, s, a, b): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test case.
@jcrist are you able to shepherd this PR through? |
@jcrist I redesigned the PR to address your concerns - see if you like it now |
@jcrist ping |
@jcrist any feedback please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I like the direction this PR is going. Just a few cleanups and I think I'm happy to merge.
distributed/client.py
Outdated
# We can still detect a race condition. | ||
if sys.version_info < (3, 7) and _current_client.get() not in (self, None): | ||
raise RuntimeError( | ||
"Detected race condition where get_dataset() is invoked in " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message still mentions get_dataset
, should probably be made a bit more generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this looks good to me. Thanks for pushing this work through, this definitely has cleaned up some of how clients are implicitly passed around.
Just to make sure I didn't miss anything glaring, I think it would be good to get one other maintainer's eyes on this before we merge. cc @jrbourbeau perhaps?
|
||
await c.close() | ||
await f.close() | ||
def test_as_current_is_thread_local(s): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clever test
Getting a completely unrelated failure on Windows since the latest merge from master |
Yeah, the test failures are unrelated, see #3774 |
Test failure is unrelated. Thanks @crusaderky, merging. |
Thanks for this fix. .Cannot wait to try it out... |