-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Discussion] Run all tasks in subprocesses #6889
Comments
We should probably do a lot of benchmarking around the costs of plasma/lmdb/other shm system. I have a hunch that it is significant versus our usual per-task overhead, and only becomes ignorable versus large memory transfers. I may be wrong. I was meaning to do these measurements, but they become much more important in the scenario here. Also, we should try to measure the non memcopy costs, and how often objects cannot be turned into pickle5 buffers in real workflows. |
With today's code:
I expect that 2 will be much higher than 1, and it will still be lower than your proposal, since you don't have any pickle/unpickle costs when a dependency->dependant chain runs on the same worker (which is something the scheduler tries very hard to achieve). Other than that, I'm eager to play around with the "shared plasma" prototype to get performance metrics. I suspect that after it the most performant solution may become small-ish workers (4~8 threads) on giant VMs (32~128 CPUs, 4~32 workers per VM) |
Thanks for writing this up
I'm interested in investigating processes more thoroughly but I'm somewhat reluctant to enable this by default even if it was performing excellently. This would actually break a couple of features which we should consider carefully (e.g. worker_client relies on accessing the worker, some synchronization primitives are also accessing the worker). That's not a showstopper but enabling this by default will impact non performance related aspects as well. Regarding the performance gains I'm curious what kind of workloads are actually impacted and how much performance we could gain. Sure, not all operations release the GIL but we should look at amortized cost, i.e. if 70-80% of the workload does release the GIL, the potential impact we can have with this is strongly reduced (I'm aware that the GIL has non trivial side effects impacting not only task execution but also communication and other things. Numbers above are random and the 80% is not related to the 20%) FWIW I've been running for a very long time on a platform where we were using only one threaded workers and it worked great. |
I think that a very good time investment would be to meter how much time typical workloads spend on numpy/pandas functions that don't release the GIL and then push PRs upstream, starting from the ones with the biggest reward/benefits. |
I think reason 3 is even stronger than it is stated here. If a worker could near-instantaneously pause a running task, it would only need to do that pausing when it is very close to OOMing to essentially prevent ever OOMing. In memory-constrained workloads, this would mostly eliminate the "fudge factor" Dask currently has to use -- for example, you wouldn't need to spill until you were actually out of memory, instead of "defensively" spilling ahead of time. For workloads with tasks that have memory explosion (I've found this is unfortunately common in my work), I frequently lose an entire worker -- having to recalculate hundreds of tasks -- unless I am even more conservative than Dask's defaults, since one task that rapidly allocates memory can OOM a worker even if it uses much less memory than that worker has available. |
We have to run some upstream code that we have no control over that relies on global mutable state and it would be helpful if we had this feature to avoid redundant race conditions. |
I'd like to discuss a somewhat major change: instead of workers using threads by default, I think they should use subprocesses, and zero-copy shared memory between the processes #4497. I'm opening an issue since this has come up many times in different places, and I'd like to be able to refer back to it.
Primary motivations:
A lot of real-world code holds the GIL, even if it feels like it shouldn't (since it's "just NumPy").
np.concatenate
holds the GIL. A lot of pandas functions hold the GIL at least part of the time. And a lot of real-world use (or "business logic") is going to involve Python code, intermixed with NumPy and pandas. Users don't want to—and shouldn't have to—think about processes vs threads to get good performance.The primary reasons to not use subprocesses are a) startup overhead and b) data transfer costs between processes. Startup overhead is trivial in a typical dask cluster, compared to the lifetime of the worker. Data transfer cost would be reduced with shared memory and zero-copy serialization, somewhat like Ray does. On top of that, we'd add optimizations to the worker for locality-based scheduling to reduce serialization and copies to shared memory except when unavoidable, and CPU pinning to reduce L1/L2 cache eviction.
So if the startup and IPC costs are alleviated, then I would expect subprocesses to give better performance in nearly all cases. Plus, writing process-safe code is much easier than writing thread-safe code (and plenty of libraries aren't thread-safe).
Isolating the Worker from user code would improve reliability.
The worker is currently completely un-isolated from user code. Side effects of user code (holding the GIL, garbage collection) impact worker performance and stability Worker event loop performance hampered by GIL/convoy effect #6325.
Users are even offered unrestricted access to poke at internals of the entire Worker itself, via
get_worker()
. This is not a great experience for either users or dask devs. The worker API is largely undocumented, and certainly not stable; its internals should not be relied on in user code. Dask is sometimes advertised as being "hackable" in this way, but for developers, the idea of some state being hacked out from under you is not appealing. Restricting the ability for user code to interact with an extremely delicate state machine and an unstable API that shouldn't be relied upon doesn't seem like a bad thing to me.Per-task memory usage could be measured and limited.
This would open the door to all sorts of scheduling, autoscaling, analytics, and cluster-autosizing possibilities for both memory-aware scheduling to avoid OOMs, and auto cluster sizing to ensure a cluster is big enough (but not too big) for a given workload. Platforms like Coiled could store and mange historical memory-consumption data.
Note that Ray is considering this, which is possible since their tasks all run in subprocesses Memory-aware task scheduling to avoid OOMs under memory pressure ray-project/ray#20495.
Dask tasks are already expected to be pure, to not interact with global state, and to not mutate their inputs, so isolating them to separate processes should not make a difference.
I think the vast, vast majority of dask use would be unaffected. Users that are accidentally relying on mutation or global variables, etc. would likely benefit from subprocesses by default, because it would surface their error immediately when running locally, as opposed to only getting nondeterministic errors (depending on scheduling) only once running on a multi-machine cluster.
The small number of users that are intentionally relying on the fact that tasks run in the same process to share state between them are already doing something unsupported, so it shouldn't be surprising if that behavior changes. We could instead offer explicit suggestions (or APIs) for sharing state between tasks if necessary.
Tasks could be cancelled via signals even while running non-Python code, without using the C API Allow workers to cancel running task? #4694, reducing the complexity of our most complex and error-prone worker state Remove cancelled, resumed, and long-running states #6844.
The initial questions to answer would be around how much overhead shared memory and any serialization would add, compared to the current typical overhead of the GIL. The crux of the implementation would be in the system for managing the shared memory (might look a lot like/be plasma #6503, and be designed with #5996 in mind), overhauling and simplifying the serialization code (likely just arrow + pickle5), and implementing locality-based scheduling within the worker (so tasks would try to run on a process where their input data was already in memory).
cc @fjetter @crusaderky @jakirkham @martindurant
The text was updated successfully, but these errors were encountered: