-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory prioritization on workers #5250
Comments
I like the proposal above for a couple of reasons:
|
First approach: #5251 This only does prioritization, and doesn't yet handle pausing (I anticipate that pausing will take more time) |
OK, I've added pausing, although it's ugly and not very future-reader friendly. This will have to be cleaned up, but it is probably simple enough for people to take a look at if they're interested. |
Reprioritizing tasks based on learned memory use seems like a great idea. That's information Overall, I like the idea of being proactive and avoiding memory over-use in the first place, rather than only being reactive and spilling, pausing, restarting, rebalancing, etc. once we've already used too much memory.
When we see workers producing too much memory, it's usually not because of problems in graph ordering. (I say this based on having looked at ordering in #2602, #4864, shuffle-service tasks, etc.) Pausing these memory-producing tasks is a nice direct way to address the symptom of this scheduling problem. But it does feel a little odd that we would start with a good ordering, then try to head in the wrong direction, then add this other mechanism to push us back towards the ordering we already knew we wanted to begin with. That said, a mechanism for workers to avoid doing things that are likely to produce a lot of memory seems like a useful thing to have in general. Because #5251 involves pausing, I think we should be careful to consider how it could cause deadlocks. There are certainly some obvious cases (like where every task is memory-producing) that would have previously OOMd which would now deadlock, but I'm particularly interested in cases where the computation would have worked if we'd taken the |
As a basic no-tasks-release-memory example, this works on main but deadlocks on #5251. We're trying to persist an array that fits in memory, but is larger than the Workers go over 50% memory, but the only tasks available to run are more memory-producing root tasks, so they pause, hoping the scheduler will give them something else to do. Since there aren't any memory-freeing tasks in existence, they'll be paused forever. In [1]: import distributed
In [2]: import dask.array as da
In [3]: from dask.utils import parse_bytes
In [4]: client = distributed.Client(n_workers=4, memory_limit="1gib", memory_spill_fraction=None, memory_target_fraction=None, memory_pause_fraction=None)
In [5]: a1 = da.random.random(parse_bytes("1.5gib") // 8, chunks='1mib') # small tasks so it takes longer than `memory_monitor_interval`
In [6]: a2 = da.random.random(parse_bytes("1.5gib") // 8, chunks='1mib')
In [7]: a = da.stack([a1, a2])
In [8]: p = client.persist(a)
In [9]: _ = distributed.wait(p, timeout=4)
distributed.worker - WARNING - Worker is at 60% memory usage. Resuming worker. Process memory: 620.96 MiB -- Worker memory limit: 1.00 GiB
distributed.worker - WARNING - Worker is at 58% memory usage. Resuming worker. Process memory: 598.85 MiB -- Worker memory limit: 1.00 GiB
...
TimeoutError: This is contrived, in that if the array were a little larger, the behavior on main would be bad too (workers would just keep OOMing and restarting until something broke). The ideal behavior would probably be for dask to raise an error saying "Your computation is going to take more memory than you have in the cluster". But it's interesting to think about how we could discover that error from the worker within this memory-prioritization logic, since that's equivalent to detecting when we're in a deadlock versus when we're reasonably paused waiting for the scheduler to give us a better task to run in the near future. As things stand right now I don't think it's possible. The worker would have to know something more about the global task graph (do any other tasks exist? do any dependencies of my tasks exist? which ones?), which currently it only learns through being given new tasks. It could:
|
pausingI would propose to exclude the pausing mechanism entirely for the moment. IMHO pausing is broken and we should take a step back and redesign it before we pile logic on top of it. Right now it is simple enough that we can throw it out and replace with something more robust. If we now include complex logic this might not be as easy any more afterwards.
speculative task assignment (STA)Once I finish #5046 it should be relatively easy to incorporate STA on worker side. At least we should be able to start off a prototype. I'm wondering how urgent this memory-producing prioritization would be if we had STA. I'm a bit on the fence regarding the memory producing/consuming reprioritization suggested here. I am worried about the imprecise, local knowledge on worker side especially since the memory producer/consumer toggle would entirely overwrite any topological ordering. I can also see situations which would let us become biased towards already learned tasks even though a topologically better one would be available but we haven't learned it, yet. That would further skew us towards breadth-first execution. I realize that if our memory producer/consumer heuristic is good enough, the skewed and biased decision might still be a good enought decision. Toughts? Heap reprioritizingWith the proposed TaskPrefix we could implement a heuristic to trigger a heap reprioritization once a taskprefix memory-producer/consumer toggle flips. That should not happen frequently, should it? |
Thank you for the example, this is helpful. In this case the worker would actually write to disk. This is usually not great, but still probably better than just pausing everything.
If speculative task assignment is easy then great, that would definitely solve our short-term pain here. That being said, I would be surprised to learn that STA is cheap, and even more surprised to learn that it remained cheap after we got into the topic. Maybe the state machine rewrite changes things though. I am curious to learn more. Do you have a time estimate on this?
I wouldn't worry too much about imprecise and local knowledge. We can also implement this on the scheduler if we want it there. This only overrides topological ordering in significant cases. There are also good reasons to override graph ordering from time to time. Graph ordering doesn't have this information. |
There have been a couple of situations where we've wanted pausing for memory producing tasks.
In both cases we don't actually want to pause the worker, we want to pause the worker for a brief interval, so that the rest of the system can catch up. If we think that STA will take a while, and that we want something else in the meantime, then "pause for a bit" might be an interesting compromise. In labor terms (yesterday was labor day in the US) this is a bit like a work slowdown rather than a general strike. |
I do not have a good handle on how big the changes on scheduler side would need to be to enable this which makes a good estimation currently difficult. I suspect bigger changes are necessary on scheduler side but I haven't spent any time or thought on this, yet. Disabling work stealing, relaxing a few sanity checks and abusing the processing state might get us up and running in a few days with a prototype. Stable operation would take much longer since there are many edge cases we'd need to account for. If we need this fully operational, this will take easily another month (after all, I need to finish the refactoring first) |
This could work. I think focusing more narrowly on exactly what we want to solve (root task overproduction) like this is a good approach. This might not even have to use pausing per se; it could be a specific condition that
I'm inclined to try this before the "pause for a bit" option. We could learn a lot from hacking a prototype in a few days. It would be nice understand the shape of the STA problem before we look for a quicker way around it. Depending on how things shake out, I could see "pause for a bit" being useful even with STA, or just an unnecessary temporary fix. |
Also should note that the implementation in #5251 doesn't work that reliably on root tasks either. Possibly because it relies on the memory monitor's RSS, which only updates every 500ms by default, it's easy for root tasks to over-produce faster than we check for memory? On my machine this just runs out of memory without seeming to try to pause, just like it would on main: In [1]: import distributed
In [2]: import dask.array as da
In [3]: from dask.utils import parse_bytes
In [4]: client = distributed.Client(n_workers=4, memory_limit="1gib", memory_spill_fraction=None, memory_target_fraction=None, memory_pause_fraction=None)
In [5]: a = da.random.random(parse_bytes("6gib") // 8)
In [6]: p = client.persist(a) I've noticed this acutely with the shuffle service. Every time I start a job, I have a nail-biting experience watching workers quickly fill up with managed memory, wondering if they'll stop in time or if they'll run out of memory and I'll have to wait 5min to recreate the cluster. For whatever reason, tonight I haven't been able to get a single shuffle to not run out of memory when using TCP comms on small clusters (works fine with TLS). So even if we went with "pause for a bit", we'd need a different implementation that can determine when to pause effectively instantly. I guess we could look at the amount of data currently stored (though it's an underestimate for RSS). |
It may be reasonable to update RSS after every task is computed. |
@fjetter what is your backlog like these days. How long will it be before you have a few days to work on this? I think that we should prioritize clearing out some of your pending work instead of adding to it. |
All other side projects of mine are rather low prio and I would be able to pick this up by mid next week, I suppose |
Did this include PR ( #5208 )? Wondering if it is as simple as we are doing a bunch of copies that eventually cause things to fall over. Or would that still run into issues for other reasons? |
It did include #5208. I think there's a different issue I need to debug. But it was interesting that TCP seemed to make memory worse. |
For reference, there has been an earlier ticket which, I believe, covers the same problem #4891 |
(Some context of this is in #2602)
Summary
Workers should start taking memory generation into local scheduling policies. This affects both task prioritization, and pausing.
Background on Prioritization
Currently task prioritization is based on four factors, in order
with dask.annotate(priority=...)
context managerIn common practice, 1 and 2 don't fluctuate much (users rarely use priorities and most comparative decisions are between tasks in the same
.compute
call, and so 3/graph structure is the most prominent. 4 is only used today when the user makes lots of submit calls without graph structure.However sometimes we learn more about the computation at runtime. One critical piece of information that we learn is the size and type of each result. This can tell us useful information like "read_parquet" tasks tend to allocate more memory than they free, which, assuming that we're in a memory constrained environment (common) can be more important than graph structure.
Graph ordering is usually great. Some times it isn't. Mostly this is because of information about memory. When should we override graph ordering prioritization?
Background on pausing
Workers sometimes choose to pause execution. This mostly happens when they are out of memory, and hope that some other process in the cluster is going to save them. This might be a few different causes:
But oftentimes pausing is also bad, and results in a deadlocked cluster. Sometimes we actually need this worker to run certain tasks so that we can get ourselves out of a jam. In these cases we don't so much want to pause work, as we want to pause all work that will lead to more medium-term memory use.
Background: current data collection
We currently track memory use on a per-TaskState basis on both the scheduler and worker (both have a TaskState class) and on a per TaskGroup basis on the scheduler.
Some decisions we have to make
So, we should probably look at storing more aggregate information (aggregations are good for generalizing and making scheduling decisions). We should also look both at the use of any particular group/prefix but also the consumption of any particular group/prefix.
Scheduler/Worker
Additionally, we also need to determine if we want to make this decision on the scheduler or worker side.
Pro-scheduler:
But then the information is farther away from where the decision is implemented. This can be hard for two reasons:
Pro-worker:
When do we override graph structure priorities?
Do we want to always do this? Only when we find tasks that are particularly memory consuming/producing? Only when we're low on memory?
Proposal: prioritization
To start things off, I'll suggest a relatively simple approach.
We add a
worker.py:TaskPrefix
class that looks like the following:It tracks total bytes of all inputs and total bytes of all outputs. Then, when computing worker priorities we do the following:
If the prefix associated to a new task produces/consumes five times as much data as it consumes/produces then we override the graph prioritity. Otherwise this level of the prioritization stays at zero, and we defer to subsequent prioritizations. This strikes a balance between believe in graph prioritization and taking control with memory prioritization. Most of the cases where this is an issue production is high and consumption is zero (like
read_parquet
) or consumption is high and production is orders of magnitude lower (likex.sum()
)Unfortunately for rootish tasks we'll have lots of tasks assigned at once before we know their memory priority. To resolve this we could check the priority as we pop things off of the
ready
heap. If the priority coming off of the heap is obviously stale then we could re-prioritize the entire heap. This would happen infrequently and is pretty cheap to do anyway, assuming we have less than millions of tasks per worker.Proposal: pausing
Currently we pause with a periodic callback once memory use gets to something like 80%. I propose that we also pause when pulling a task off of the ready stack if that task is memory producing and our memory is something like 50%. This should be more responsive. We'll want to unpause immediately if we get a non-memory-producing task.
The text was updated successfully, but these errors were encountered: