Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][run_function_on_all_workers] deflake run_function_on_all_workers and reenable test #31838

Merged
merged 14 commits into from
Jan 22, 2023

Conversation

scv119
Copy link
Contributor

@scv119 scv119 commented Jan 21, 2023

Why are these changes needed?

run_function_on_all_workers importing requires job_id to run properly. after #30883 the worker might not have job_id when startup, which lead to run_function_on_all_workers failed to be executed on start up. to fix it, we defer the import_thread start up until job_config is initialized.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@scv119 scv119 changed the title [Core][run_function_on_all_workers] fix run_function_on_all_workers and reenable test [Core][run_function_on_all_workers] deflake run_function_on_all_workers and reenable test Jan 21, 2023
@scv119 scv119 marked this pull request as ready for review January 21, 2023 22:24
python/ray/_private/worker.py Outdated Show resolved Hide resolved
python/ray/tests/test_failure.py Show resolved Hide resolved
python/ray/_private/worker.py Outdated Show resolved Hide resolved
# Start the import thread
# Setup import thread, but defer the start up of
# import thread until job_config is initialized.
# (python/ray/_raylet.pyx maybe_initialize_job_config)
if mode not in (RESTORE_WORKER_MODE, SPILL_WORKER_MODE):
worker.import_thread = import_thread.ImportThread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where other parts of Ray will call things like import_thread.join() during this window where the import thread isn't started?

I also notice that self.threads_stopped.is_set() is checked after self._do_importing() is called. So in the case where threads_stopped is set before the import thread is started, then the import thread will call _do_importing once before shutting down. Could cause a crash. I'm not sure how this is handled or even if it's likely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where other parts of Ray will call things like import_thread.join() during this window where the import thread isn't started?

this should be fine since the import_thread.join() will check if import_thread has already started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also notice that self.threads_stopped.is_set() is checked after self._do_importing() is called. So in the case where threads_stopped is set before the import thread is started, then the import thread will call _do_importing once before shutting down. Could cause a crash. I'm not sure how this is handled or even if it's likely.

good catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where other parts of Ray will call things like import_thread.join() during this window where the import thread isn't started?

this should be fine since the import_thread.join() will check if import_thread has already started.

Yep. I was thinking more of ordering issues -- any thread which calls import_thread.join will expect to continue after import_thread has finished. But that assumption could not be held. Not sure how impactful this is; hence why I asked.

# Start the import thread
# Setup import thread, but defer the start up of
# import thread until job_config is initialized.
# (python/ray/_raylet.pyx maybe_initialize_job_config)
if mode not in (RESTORE_WORKER_MODE, SPILL_WORKER_MODE):
worker.import_thread = import_thread.ImportThread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases where other parts of Ray will call things like import_thread.join() during this window where the import thread isn't started?

this should be fine since the import_thread.join() will check if import_thread has already started.

Yep. I was thinking more of ordering issues -- any thread which calls import_thread.join will expect to continue after import_thread has finished. But that assumption could not be held. Not sure how impactful this is; hence why I asked.

@scv119 scv119 merged commit e9689ed into ray-project:master Jan 22, 2023
@rkooo567
Copy link
Contributor

Hmm isn't there a possibility of regression? E.g.,

previously the import thread starts as soon as the worker starts and imports the task/actor definition (iirc)

Now it is deferred until the first task is sent. That means we will have longer delay in the first execution (because the import will happen after the task/actor is submitted first time).

@scv119
Copy link
Contributor Author

scv119 commented Jan 23, 2023

Hmm isn't there a possibility of regression? E.g.,
previously the import thread starts as soon as the worker starts and imports the task/actor definition (iirc)
Now it is deferred until the first task is sent. That means we will have longer delay in the first execution (because the import will happen after the task/actor is submitted first time).

@rkooo567 good point. I suspect it will probably fine since we do import once on thread creation: https://github.com/ray-project/ray/pull/31838/files#diff-ae2296559ecbe71776d337ff68fff30419b7acb67177065bcc6e49ad631e8e70L47

  1. so for prestarted worker without job_id: anyway it will not import any dependencies until job_id show up.
  2. for worker started with job_id, it will import dependencies once before receiving job_info.

So one thing we can do is immediately after import_thread created, we opportunistically start import thread.

scv119 added a commit that referenced this pull request Jan 23, 2023
…#31846)

Why are these changes needed?
Previously the import thread starts as soon as the worker starts and imports the task/actor definition. After #31838, it is deferred until the first task is sent. That means we will have longer delay in the first execution.
To address the problem, we can opportunistically start the import thread after the import thread is created, if the job_id does exist.
@clarkzinzow
Copy link
Contributor

@rkooo567 @scv119 Is it possible that this could cause a regression with imports happening concurrently with argument deserialization or even task execution? It looks like a Datasets test became flaky at around the time of this PR, failing with a AttributeError: module 'pandas' has no attribute 'core' error, which is typically seen on concurrent importing of Pandas.

https://buildkite.com/ray-project/oss-ci-build-branch/builds/1939#0185dad9-df1c-4be7-a6f4-41f0ff45e936

@clarkzinzow
Copy link
Contributor

For context, importing Pandas is not reentrant, and if the import thread imports Pandas at the same time as argument deserialization or task execution imports Pandas in the execution thread, one of the threads will crash with this error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants