-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler memory leak / large worker footprint on simple workload #3898
Comments
I also see a lot of this in the logs
|
For memory issues on the worker I recommend that you look through dask/dask#3530 20ish seconds for 200k tasks is a bit long, but not terribly. Task overhead is in the range of 200us per task, so this is in line with expectations. There are efforts to accelerate this, but they're probably a few months out at best. I suspect that your minimal example is intentionally simplified, but your chunk sizes are quite small. A good way to avoid task overhead is to have fewer larger tasks. I don't know why your scheduler memory would stay at 1GB after a restart, except perhaps that your OS has decided to hold onto the memory for a while (this is common). |
Thanks for following up. I did start at #3530, but was advised that since I was using a single-threaded scheduler, the problem there was not mine. The large-graph problem is partially the chunk size, but also because overlap computations are part of the mix. I can use larger chunks for some things, but this is one use case I have where smaller chunks are required. I'm aligned images, and the alignment varies across the larger image. Basically, I'm micro aligning 512x512 chunks. I suppose I could go the inception path: a map_overlap within a map_overlap using scheduler=synchronous on the inside. As an aside, my real chunks are more like ~5x200x512x512 uint16, which is ~500MB. But I wanted an example that would be the same number of tasks, but quickly do-able as a self contained example. I'm still curious as to why a purely parallel computation (other than the overlap) is not proceeding in a way that minimizes memory consumption.... it seems like it's trying to read the whole thing into all the workers memory at once. How long could the OS hold onto things? I run both locally and on GCP on Ubuntu images, and I've never seen it go away. Each time I add a graph, it eats up more space, so it doesn't seem to be re-using a heap or anything like that. I run my dask-gateway scheduler with 15GB of memory, but generally restart before I hit that because things get touchy. (I had to turn off autoscaling, because including that on top of this seemed to bog the scheduler.) |
To be clear, in the example above you're using the distributed scheduler. When we talk about the single-threaded scheduler we mean when you don't create a client, and you call
I figured, and I appreciate it.
It should. If you're interested you can try
Some OS's hold onto memory until someone else needs it. If your machine is properly running out of RAM though then that's not this. |
In short, I personally don't know what's going on. I would probably have to dive in more deeply to figure out what the problem is, but it's unlikely that I'll have time to do that any time in the near future. |
Instead of |
I do both - I cancel all futures, then restart the client. The memory still persists. The One thing I've noticed is that the designations for overlap tasks have fractions. Is there any comparison of floating points that might happen? From playing around with a few small examples, the orderings look fine to me. Is order fully respected globally in some way (or even just on a worker)? I could imagine that if some task is waiting due to some upstream bottleneck, later-ordered but read-to-go tasks could be processed. For example, data can always be read in and consume memory. And with overlap tasks, is it safe to assume that just the necessary neighbor data is sliced before being transferred (and a full transfer of the neighbor chunks is not done)? |
Ordering is the primary mechanism to determine the order of tasks, but as
you point out the workers may choose to do other things if a task is
blocked on something like communication.
Yes, data is sliced before being transferred. Hopefully we only move
around small pieces.
…On Wed, Jun 17, 2020 at 1:43 PM Chris Roat ***@***.***> wrote:
I do both - I cancel all futures, then restart the client. The memory
still persists.
The color="order" is super neat. With optimize_graph=True,
collapse_outputs=True, it is pretty fun to use.
One thing I've noticed is that the designations for overlap tasks have
fractions. Is there any comparison of floating points that might happen?
From playing around with a few small examples, the orderings look fine to
me. Is order fully respected globally in some way (or even just on a
worker)? I could imagine that if some task is waiting due to some upstream
bottleneck, later-ordered but read-to-go tasks could be processed. For
example, data can always be read in and consume memory.
And with overlap tasks, is it safe to assume that just the necessary
neighbor data is sliced before being transferred (and a full transfer of
the neighbor chunks is not done)?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#3898 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTGYWQJVXI5G655TSOTRXETFNANCNFSM4N6LPGGA>
.
|
You'll want to double check, but I think bytes stored refers to the
contents of `Worker.data`, which might include data that has been spilled
to disk.
…On Mon, Jun 22, 2020 at 6:31 PM Chris Roat ***@***.***> wrote:
Not directly related to this topic, but related to the "read in data" task
running: I've set DASK_DISTRIBUTED__WORKER__MEMORY__TARGET to 0.3 on a 21GB
worker. The bokeh dashboard indicates workers have 16-22GB of "bytes
stored", and the GCP dashboard for one worker shows 14GB used. Does "bytes
stored" equate to memory, and if so, under what conditions would that go
above the memory target?
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#3898 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAKAOIVG4TL4CGFWC3NR7R3RX7SVFANCNFSM4N6LPGGA>
.
|
I was curious if any of the leak was fractional keys, and found there is some code in the overlap module that is being reported as leaking. Here is the top-20 lines reported, which totals ~400MB (of 1GB growth I see), within dask/distributed 2.19.0 (Tagging @jakirkham from the other thread) Perhaps those knowledgable of dask internals can verify whether tracemalloc has found something real, and what possible mitigations would be. Report:
Code:
|
I re-tested this in 2.25.0 and found the memory increase on the scheduler (after reset) is now even higher - 2.2GB. The code above just shows possible leaks in the main process, which is roughly the same. I'm not sure yet how to figure out where the scheduler memory is going. My workaround is to bring up a new cluster for each calculation. |
Thanks for keeping up with this @chrisroat . I'm not sure I fully understand the printout that you gave a few months ago. What do the sizes mean there? Data that is no longer being tracked by Python? If so, I'm curious how this is being discovered. Some of those lines, like It's my first time seeing data like this. Can you help interpret it? |
Those are coming from tracemalloc, which is supposed to detect memory leaks. I'm realizing now, however, that the 2GB scheduler memory increase is likely not traced here, as it's likely a child process. I think memory-profiler can profile child processes, but I haven't tried that yet. |
The scheduler is run in the same process by default with the local cluster.
I guess I'm surprised that finding memory leaks is that easy. I'll have to
learn more about how tracemalloc works.
…On Tue, Sep 1, 2020, 7:19 PM Chris Roat ***@***.***> wrote:
Those are coming from tracemalloc
<https://docs.python.org/3/library/tracemalloc.html>, which is supposed
to detect memory leaks.
I'm realizing now, however, that the 2GB scheduler memory increase is
likely not traced here, as it's likely a child process. I think
memory-profiler can profile child processes, but I haven't tried that yet.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#3898 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTEVQD5DQ5EHG2W6F33SDWTTLANCNFSM4N6LPGGA>
.
|
Not sure if related but I observed in the past a significant memory buildup, especially on the scheduler, by the various logs we keep. (transition_log, events log, etc.) While these logs are implemented as a deque and are limited by design, the limit is sometimes too large for small pod sizes. I've seen this taking up several GBs of memory at occasion def clear_logs(dask_scheduler):
dask_scheduler.log.clear()
dask_scheduler.transition_log.clear()
dask_scheduler.events.clear()
client.run_on_scheduler(clear_logs) and see if memory drops. These collections are not cleared when triggering restart (in fact, an event is logged :) ) We keep similar structures on the worker but I haven't investigated the impact over there since it is used more rarely and there are fewer |
Thanks for the tip on the the scheduler logs. Unfortunately, that doesn't seem to be the problem here. I ran The schedule jumps up early to 1GB of memory and pegs at 60%+ CPU. This is with 2020.1.0 dask/distributed. |
With the most recent HEAD, this seems to have gotten larger, at least via the system graph on the scheduler system monitor. The leak has gone from 1GB of leak in 2021.3.0 to 3GB at dask 2021.03.0+30.g76a3f866 and distributed 2021.03.0+17.g206c6f84. I ran tracemalloc directly on the scheduler as suggested in #4571 (comment), I was able to get what seems to be a much more believable set of memory leaks than those listed above. But the total still doesn't quite fully add up to the total seen in the system viewer. @madsbk because they seem to have recently been working on this code and might know if this is real and what might cause it.
Output (deleted path prefixes):
|
Lowering |
I have used HEAD, and am in fact digging into the scheduler code this week in an effort to understand this leak (and other performance issues I am having). My current versions are dask 2021.03.1+3.gcb34b298 and distributed 2021.03.1+5.g77a1fd19. Using a lower log length does remove some of the memory usage -- the same amount I can remove by clearing the logs at the end of the processing, which makes sense to me. When you say "worked for me", did you mean you don't see the memory jump in this use case? Or in your own use case? Without all the tracemalloc stuff I put above to help anyone attempting to jump in, I've simplified my case to a simple map_overlap:
The scheduler initially uses ~100MB of memory. Running the above once causes it to jump to ~1.3GB. Subsequent runs (without creating a new client), cause the memory to jump ~200MB each time and cause a few gc warnings. I think the big initial jump might be some memory that gets allocated and re-used in subsequent runs, and the true leak is just ~200MB per execution. I have no concrete evidence, but I worry map_overlap uses floating point keys and perhaps some comparison is messed up somewhere because of that. I'm only at the very beginning of digging into the scheduler, which seems pretty large and complex. |
I spent some more time investigating this. I've also tested with recent dask/dask#7525 #4677 (@jakirkham @madsbk). The memory leaks are generated without the need of complicated graphs. Simply:
|
I'm not sure how helpful it is to add on anecdotes rather than detail, but I've recently hit this specifically with With 100 workers each producing a ~250MB array and attempting xarray's Here's the repro, from: pydata/xarray#5499 import dask
import distributed
from dask.distributed import Client
import socket
from pathlib import Path
import xarray as xr
client = Client(f"tcp://localhost:8786")
size = 5000
def func(v):
return xr.Dataset(
dict(
a=xr.DataArray(np.ones((size, size)), dims=["x", "y"]) * v,
b=xr.DataArray(np.ones((size, size)), dims=["x", "y"]) * v * -1,
),
coords=dict(v=[v]),
)
DATA_PATH = Path("~/workspace/dask-test/").expanduser()
DATA_PATH.mkdir(exist_ok=True)
def write_zarr(v):
ds = func(v)
ds.chunk(dict(x=-1)).to_zarr(
f"{DATA_PATH}/chunks/{v}.zarr", mode="w", consolidated=True
)
futures = client.map(write_zarr, list(range(20))) Running this creates 8.5GB of unmanaged memory. ...though only leaves 3.8GB of unmanaged memory after a few minutes: On that instance it did work. Often it fails with |
In case this is of help to folks here. We've been adding on to our documentation about unmanaged memory which some might find useful. In particular when running on UNIX systems, there are various possibilities to reduce the unmanaged memory. https://distributed.dask.org/en/latest/worker.html#memory-not-released-back-to-the-os |
@fjetter Could someone explain why we have to mess around with |
@j3pic see dask/dask#3530 (comment) for some explanation. I'm not actually sure why the low glibc default 128KiB default doesn't work—I think the setting doesn't quite behave as you'd think. According to |
@j3pic FWIW we recently introduced some default parameters for this here distributed/distributed/distributed.yaml Lines 157 to 160 in f10957d
@crusaderky has been looking into this the most and may be able to provide more info |
Indeed explicitly setting MALLOC_TRIM_THRESHOLD_ to its supposed default will alter the glibc behaviour. Unsure why; sounds like a bug in either the documentation or the glibc itself. |
Just found the following snippet in the
Perhaps it helps in understanding this? |
@zklaus it does. This issue should be resolved by setting MALLOC_TRIM_THRESHOLD_. I'm closing this - please reopen if, after properly setting MALLOC_TRIM_THRESHOLD_, you still experience issues. |
I tried setting the variable in the shell before I came across the nanny version. In my case, it didn't help, but I may be suffering from a completely unrelated leak, so I am not gonna reopen this issue. I do wonder if there is something else to be done here: The dynamic mmap thresholding means that as soon as large blocks are freed, the heap will see much more use. The |
FYI, if you're running a recent version, there's a known worker memory leak especially if your graph contains large-ish objects: #5960 |
I don't think this should be closed. At the very least, we should re-run the original post's code and verify the problem no longer exists, or demonstrate a workaround that avoids the problem. It's the scheduler where the leak might be, so the current worker leak may not be related. I'm recopying the code here:
I did try the fixes proposed throughout this thread. I generally see hundreds of extra MB each time the graph is submitted. My solution, which may be good or not, was to have our libraries check the scheduler memory usage prior to submitting a graph. If the scheduler memory is above a few GB, the scheduler is killed and restarted. It means we can't share schedulers among processes -- each scheduler is "owned" by one process. (And anyway submitting large graphs from many processes slows down the scheduler, in addition to the memory issues.) |
We did a work around this by using KubeCluster |
Thanks @chrisroat. The #5960 thread is confusing, because there are actually two issues being discussed there (leak on workers, leak on scheduler). I consider that thread to only be about the worker leak (and we have found the underlying problem there, though it's not fixed). I just saw "nanny" in @zklaus's message, so I assumed that was about workers too. I think @djhoese may be talking about similar scheduler memory growth as you. #5971 is not relevant to that, because the scheduler doesn't run within a Nanny. Knowing if setting |
If the thread is confusing because of too many commingled discussions, wonder if we should raise new issues about these specific pieces and close this out. |
If it did, wouldn't it be a red flag for large numpy arrays* sitting on the scheduler inside the dask graph instead of being scattered directly to the workers? *or any other objects featuring contiguous buffers - e.g. not basic python types |
Hi. I was investigating this again for another project, and the memory bloat seems to still be present. (This means we have to kill & restart our pool after a couple of large overlapping calculations, as the scheduler just grinds to a halt.) I did a fair amount of debugging in the past to try and narrow things down. Not sure if anything earlier in this thread is helpful. |
There is some internal logging that has been causing a memory increase over time and we're about to reduce the size of those internal logs in #8173 On #8164 there is a short analysis on how the memory increase behaves over time. From this analysis, after the internal logs are cut, the remaining memory increase is related to the actual computation graph and is of the order of ~16KiB per task in the graph distributed on all workers (i.e. a graph of 1MM tasks would have an overhead of ~16GiB. If you have 100 workers, that'd be roughly If somebody is observing an increase that is larger than what I am describing here, please let us know. If you manage to produce a reproducer, that would be ideal, of course. |
You are in luck -- the reproducer from the first post of this thread still shows the issue. Throughout the thread you can see the various things I tried -- e.g., memory allocation env vars and aggressively clearing logs. I also dug through a lot of the communication code; it's not my area of expertise, but it smelled like something was leaking at that level. I used to use a linux system, but yesterday I checked with my mac -- the issue is still present. Note the memory leak here is not related to the workers. As per the title, it is the scheduler that jumps ~1GB on the first computation. Additional, similar computations keep increasing the memory ~200MB. Eventually, the scheduler becomes unresponsive. See this comment with a graph of scheduler memory initial jump. Through trial and error I found the memory level at which the scheduler was unusable. In my old work, I just checked the scheduler memory before submitting a graph. If the memory was too large, I killed it and started a new scheduler; and then submitted the graph on a fresh cluster. |
The reproducer from the first post looks like this on my machine running on current main I ran the reproducer in a loop (this screenshot after three runs). Three observations
This was produced on a M1 mac book. Memory behaves a little funny on OSX and I suspect that the weird peak and intermediate low memory is just a relict of OSX memory compression. I'll reproduce this on Linux shortly. Recently I did a scheduler memory profiling for a large graph (see #7998 (comment)) that shows how the scheduler memory usage breaks out. Indeed, most of it is internal state that should be released or at the very least be reusable if it cannot be released due to fragmentation. |
What happened:
Running an embarrassingly parallel map_overlap workload may be causing a memory leak in the scheduler. Upon completion, releasing the tasks and restarting the client does not reclaim the memory. The example below, with 200k tasks, shows a jump in scheduler memory from 100MB to 1.3GB while running the graph. After
client.restart
, it remains at 1.1GB.In addition, the memory of the workers climb into the "yellow", where I believe swapping to disk begins to happen. Given the parallel nature of this workload, workers ought to be able discard pieces when they are done with them.
From a performance perspective, during
client.compute
, the scheduler gets unresponsive (it takes 20ish seconds to start), presumably because its loading a large graph. I have seen this cause already running computations to start erroring. I've seen lost keys and KilledWorkers.And finally, anecdotally, it sometimes happens that one worker runs hot, getting 10x the tasks of other workers. Eventually, forward progress halts. I now watch for this, and then kill that worker, which redistributes the work and finishes the job. (I'm using dask-gateway on K8s).
What you expected to happen:
Minimal Complete Verifiable Example:
Environment:
The text was updated successfully, but these errors were encountered: