-
Notifications
You must be signed in to change notification settings - Fork 653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX-#7346: Handle execution on Dask workers to avoid creating conflic… #7347
base: main
Are you sure you want to change the base?
Conversation
…ating conflicting Clients Signed-off-by: Michael Akerman <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @data-makerman, great patch! Would you be able to add a new test with the breaking code from your issue #7346 to the end of modin/tests/core/storage_foramts/pandas/test_internals.py
? I tested this and it works in Ray, so we shouldn't need to exclude any engine from the test.
@@ -30,6 +30,17 @@ | |||
def initialize_dask(): | |||
"""Initialize Dask environment.""" | |||
from distributed.client import default_client | |||
from distributed.worker import get_worker | |||
|
|||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder why initialize_dask is called in a worker? Could you verify if this is the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely don't fully understand it. I can verify that this PR stops the issue and prevents multiple instances of a Dask cluster from starting under any of the operations where I was observing the behavior. Since it occurs in an apply
context on a remote worker, it was beyond my available time (or existing technical skill) to debug exactly what was happening on the Dask worker. It seems possible that there's some other root cause leading to a call to initialize_dask
.
I can verify by inference that initialize_dask
is being called inside a worker, because it appears to be the only place in Modin 0.31 where the distributed.Client
class is ever instantiated, and I can observe in the stdout that multiple Clients are being created as daemonic processes on Dask during the apply
operation demonstrated in #7346, but only when working with Modin (not with the equivalent operation in Pandas).
I can hazard a partial guess as to what might be happening that would require further study based on some very confusing behavior I observed: sometimes while attempting to use client.submit(lambda x: x.apply(foo), pandas_df)
directly on a Pandas dataframe (not Modin), I saw the same error, but only if Modin had been imported using import modin.pandas as pd
. It made me wonder if Dask was calling a pd
function while pd
had been masked in the worker's namespace by Modin?
I think I can probably create a working example of that if I have enough time later, which might help find the root cause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have a decent understanding of what is going on, but there's still something weird happening that I can't explain.
Modin is never fully initialized on the workers. Modin's initialization code is never run on the workers, unless a worker runs a task requires subscribing to an engine it will never have this problem. Link:
modin/modin/core/execution/dispatching/factories/dispatcher.py
Lines 99 to 118 in f5f9ae9
class FactoryDispatcher(object): | |
""" | |
Class that routes IO-work to the factories. | |
This class is responsible for keeping selected factory up-to-date and dispatching | |
calls of IO-functions to its actual execution-specific implementations. | |
""" | |
__factory: factories.BaseFactory = None | |
@classmethod | |
def get_factory(cls) -> factories.BaseFactory: | |
"""Get current factory.""" | |
if cls.__factory is None: | |
from modin.pandas import _update_engine | |
Engine.subscribe(_update_engine) | |
Engine.subscribe(cls._update_factory) | |
StorageFormat.subscribe(cls._update_factory) | |
return cls.__factory |
The Series
constructor ends up in this path via from_pandas
. So when you call pd.Series(...)
from within an apply
function, cloudpickle will serialize all of the dependencies of that call and then unpack it within a worker. This could potentially explain why @data-makerman is seeing this happening with pandas after using Modin (but that still seems like a bug to me and is worth investigating more).
Now, what I don't understand is that in ipython
and jupyter
, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:
I don't really even understand why it's different, because if I use %run issue-7346.py
within ipython I get the same issue as before with Dask:
So I think this patch is absolutely correct in detecting that we are in a worker and avoiding initializing a second Dask cluster, but I will follow on with a patch for this weird ipython issue.
Also, the reason this doesn't happen with Ray is because Ray uses a global variable that all workers share to ensure that one and only one Ray cluster are initialized in the same client/worker. We bypass initialization if Ray is initialized here:
if not ray.is_initialized() or override_is_cluster: |
Ray will always be initialized from within a worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, what I don't understand is that in ipython and jupyter, instead of hitting this problem we have something else entirely, which is that Modin starts a Ray cluster inside a Dask cluster:
This probably has to do with the fact that workers know nothing about Modin configs set in the main process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@YarShev I think that is correct, but do you know why it would be different in ipython
vs the python script?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would guess there is a different strategy for importing modules in ipython
.
Signed-off-by: Michael Akerman <[email protected]>
a360f93
to
3d17773
Compare
Added, although I'm not 100% sure I matched the desired test format and standard so please let me know if any changes are needed! The new test is passing on my branch. That said, I don't understand how to test that the test properly fails. If I revert my fix commit and run it as-is with the |
@data-makerman you can use |
Co-authored-by: Iaroslav Igoshev <[email protected]>
Signed-off-by: Michael Akerman <[email protected]>
Signed-off-by: Michael Akerman <[email protected]>
Signed-off-by: Michael Akerman <[email protected]>
…Dask-workers-to-avoid-creating-conflicting-Clients
At this point the pipeline is failing on tests which don't seem to have anything to do with changes in this PR, and I'm frankly a bit at a loss as to what they might mean. I have merged the current state of the The failures are variations on:
|
…ting Clients
What do these changes do?
Check if execution is happening on a Dask worker node and, if so, avoid creating conflicting clients. Worker nodes are not allowed to make additional clients.
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
pytest modin/tests/core/storage_formats/pandas/test_internals.py
is passing.docs/development/architecture.rst
is up-to-date