Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker memory not being freed when tasks complete #2757

Open
TomAugspurger opened this issue Jun 7, 2019 · 26 comments
Open

Worker memory not being freed when tasks complete #2757

TomAugspurger opened this issue Jun 7, 2019 · 26 comments

Comments

@TomAugspurger
Copy link
Member

I'm still investigating, but in the meantime I wanted to get this issue started.

I'm noticing that after executing a task graph with large inputs and a small output, my worker memory stays high. In the example below we

  1. Generate data (large byte strings)
  2. filter data (slice)
  3. reduce many tasks (sum)

So the final result returned to the client is small, a single Python int. The only large objects should be the initially generated bytestrings.

The console output below is

  1. per-worker memory usage before the computation (~30 MB)
  2. per-worker memory usage right after the computation (~ 230 MB)
  3. per-worker memory usage 5 seconds after, in case things take some time to settle down. (~ 230 MB)
Memory usage [before]
{'tcp://192.168.7.20:50533': '30.92 MB', 'tcp://192.168.7.20:50534': '30.95 MB'}
running
Memory usage [after]
{'tcp://192.168.7.20:50533': '231.97 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}
Memory usage [after]
{'tcp://192.168.7.20:50533': '232.05 MB',
 'tcp://192.168.7.20:50534': '232.63 MB'}

In an effort to test whether the scheduler or worker is holding a reference to the data, I submit a bunch of tiny inc tasks to one of the worker. I notice that the memory on that worker does settle down

Memory usage [final]
{'tcp://192.168.7.20:52114': '232.77 MB',
 'tcp://192.168.7.20:52115': '49.73 MB'}

That's at least consistent with the worker or scheduler holding a reference to the data, but there could be many other causes. I'm still debugging.

The number of inc tasks, 2731, seems to be significant. With 2730 inc tasks, I don't see any memory reduction on that worker.

import time
from dask.utils import parse_bytes, format_bytes
import pprint
import string
import toolz
from distributed import Client, wait

N = parse_bytes("100 Mb")
I = 20


def inc(x):
    return x + 1


def f(x, n=N):
    time.sleep(0.05)
    return string.ascii_letters[x % 52].encode() * n


def g(x):
    time.sleep(0.02)
    return x[:5]


def h(*args):
    return sum(x[0] for x in args)


def get_mem(dask_worker):
    return dask_worker.monitor.proc.memory_info().rss


def main():
    dsk = {}
    for i in range(I):
        dsk[f'a-{i}'] = (f, i, N)
        dsk[f'b-{i}'] = (g, f'a-{i}')
    dsk['c-0'] = (h,) + tuple(f'b-{i}' for i in range(I))

    with Client(n_workers=2, threads_per_worker=1, memory_limit='500Mb', processes=True) as client:
        print("Memory usage [before]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        print("running")
        client.get(dsk, keys=["c-0"])
        time.sleep(2)  # let things settle

        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))

        time.sleep(5)  # settle some more?
        print("Memory usage [after]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


        print("clear things?")
        futures = client.map(inc, range(2731), pure=False)
        wait(futures)
        del futures

        print("Memory usage [final]")
        pprint.pprint(toolz.valmap(format_bytes, client.run(get_mem)))


if __name__ == '__main__':
    main()
@mrocklin
Copy link
Member

mrocklin commented Jun 7, 2019

Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary

def __init__(self, ...):
    self.weak_data = weakref.WeakValueDictionary()

def put_key_in_memory(self, key, value):
    self.data[key] = value
    self.weak_data[key] = value

Then, after seeing things flush through, you could check on the references to the items in the weak_data dictionary. This could even become a test fairly easily if we were to implement a custom mapping.

@mrocklin
Copy link
Member

mrocklin commented Jun 7, 2019

Regardless, thanks for investigating here. Issues like this have come up over the years without much resolution.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jun 7, 2019

Another thing that might be worth doing is to add all values placed into Worker.data also into a weakref.WeakValueDictionary

Surprisingly(?) this exacerbates the memory leak. My computation doesn't even complete because the workers are killed for exceeding the memory limit. I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either...

@mrocklin
Copy link
Member

mrocklin commented Jun 7, 2019

I guess if we are holding on to a reference somewhere, then we wouldn't expect it to be released from weak_data either..

Right, but you would be able to look at the objects that have stayed around in weak_data, and then track their references

if worker.weak_data:
    obj = list(weak_data.values())[0]
    print(gc.get_referrers(obj))

@TomAugspurger
Copy link
Member Author

Will give that a shot.

Another (failed) attempt, I made a custom object Foo

class Foo:
    def __init__(self, x, n=N):
        self.thing = string.ascii_letters[x % 52].encode() * n

    def __getitem__(self, index):
        return self.thing[index]

    def __sizeof__(self):
        return sys.getsizeof(self.thing) + sys.getsizeof(object())

and ran objgraph.by_type on the workers, nannies, and scheduler. It didn't find any :/

@mrocklin
Copy link
Member

mrocklin commented Jun 7, 2019

You should probably also be aware of dask/dask#3530

@TomAugspurger
Copy link
Member Author

You should probably also be aware of dask/dask#3530

Yes, that's a possible culprit.

Interestingly, I'm not seeing the memory lingering when running my original script on a linux system.

@bnaul
Copy link
Contributor

bnaul commented Jun 13, 2019

I don't have any insights to offer but I'm eager to hear any ideas about this; I'm often running into situations where I perform some kind of aggregation over a large dataset only to find I've permanently leaked 2TB of RAM 🤕

Screen Shot 2019-06-13 at 4 50 54 PM
Screen Shot 2019-06-13 at 4 51 17 PM

@dhirschfeld
Copy link
Contributor

dhirschfeld commented Jun 14, 2019

I've run into issues previously with numpy arrays requiring an explicit gc.collect() before being released. I'm curious - does a gc.collect() work for you @bnaul to free up the memory?

I guess the below would work:

def collect():
    import gc
    gc.collect()

client.run(collect)

Edit: I should mention that this wasn't with dask but the symptoms sound familiar...

@bnaul
Copy link
Contributor

bnaul commented Jun 14, 2019

@dhirschfeld I have tried manually garbage collecting after reading the other issue Matt linked above and didn't see an improvement. Appreciate the suggestion though. This is using pandas read_csv for all the IO but I'm fairly confident I see the same behavior w/ other methods.

@mrocklin
Copy link
Member

My guess here is that Dask isn't tracking any of the leaked data, and that we're in a situation where the next thing to do is to use normal Python methods to detect memory leaks (like the gc module). This is made more complex by the fact that we have to do this investigation in other processes. Using the client.run function is probably helpful here:

def f(dask_worker):
    return len(dask_worker.data)

client.run(f)

def f():
    return len([obj for obj in gc.get_objects() if isinstance(obj, pd.DataFrame)])

client.run(f)

...

@jsanjay63
Copy link

Any progress on this issue? Thanks.

@TomAugspurger
Copy link
Member Author

@jsanjay63 this GitHub issue should reflect the current state of things.

@kevinpauli
Copy link

kevinpauli commented Sep 2, 2020

What is the workaround for this in the real world? Do people not use clusters with long-running workers? Or, are people okay with the worker eventually dying and the the task getting retried?

@jsanjay63
Copy link

Small sample example to reproduce this issue: https://stackoverflow.com/questions/64046973/dask-memory-leakage-issue-with-json-and-requests

@gioxc88
Copy link

gioxc88 commented Nov 25, 2020

same problem here.
Is there a way to overcome this while we wait for the fix?

at the moment I am doing client.restart() every time

@quasiben
Copy link
Member

You might be interested in testing out this PR #4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change

@gioxc88
Copy link

gioxc88 commented Dec 8, 2020

You might be interested in testing out this PR #4221 . Reporting success or failures would be welcome. Note, this PR is still in flux and subject to change

Done but unfortunately no success. Thanks for the suggestion

** @quasiben EDIT: **
I was wrong. I was using the wrong branch when I tested the PR you suggested. It does half the job because it successfully kills the workers when they reach 80% of the memory limit but still the compute fails with an error

760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/venv/lib/python3.8/site-packages/distributed/client.py in _gather()
   1849                             exc = CancelledError(key)
   1850                         else:
-> 1851                             raise exception.with_traceback(traceback)
   1852                         raise exc
   1853                     if errors == "skip":

KilledWorker: ('_fit_one_estimator-e733af06-4296-44e5-8a1d-28a876c9f9a0', <Worker 'tcp://10.27.2.239:37489', name: 45, memory: 0, processing: 46>)

@gioxc88
Copy link

gioxc88 commented Dec 8, 2020

This problem is indeed a big one, preventing me to use dask in production where I have a very long running task and 200 gigs of memory get used in not time
Is there anything at all you can suggest to investigate the issue or try to mitigate it?

I already tried the suggested PR without success.
My structure is made by nested multiprocessing on different layers like:

level A                            A1
level B                  B1                 D2
level C             C1        C2       C3        C4 
level D           D1  D2    D3  D4   D5  D6    D7  D8         

A1 runs in parallel (B1, B2)

B1 runs in parallel (C1, C2)
B2 runs in parallel (C3, C4)

C1 runs in parallel (D1, D2)
C2 runs in parallel (D3, D4)
C3 runs in parallel (D6, D5)
C4 runs in parallel (D7, D8)

And everything works fine except for the fact that once the tasks in the most inner layer D are completed the memory never gets released and it accumulates until the kernel dies.

In this situation of nested processes I cannot even restart the client in the inner layers because this will end up affecting the whole computation. So for me there is really no solution here.

Any help would be much appreciated.

@vladaburian
Copy link

I'm also experiencing some king of memory leak, though it might not be related. I'm using only Dask distributed as a job scheduler, not even passing any substantial data. The input is just a filename and there is no return value. And the job itself is calling only bare pandas and numpy. This way I'm processing 4000 files (almost equaly sized) on 40 core machine in cca 45 minutes.

With Dask distributed the memory usage continuously increases until the work is done. At that point it's consuming 40 GB and the memory is not freed. I see distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? in logs, which seems alright because Dask is not involved in any data manipulation.

The strange thing is that I don't experience memory leak when using multiprocessing.Pool to dispatch those jobs. (And I also get better CPU usage.) Though it also consumes cca 10 GB which is not freed entirely, but the usage is almost constant all the time.

So it seams that Dask is not directly involved, yet it makes the difference somehow.

I'm running Debian buster, python3.7 and latest libraries (dask==2020.12.0, numpy==1.19.5, pandas==1.2.0). (Python3.8 seems to make no difference.)

There are htop leak-screenshots.zip.

See the code...

def compute_profile_from_file(filepath):
    df = pd.read_csv(filepath, compression="gzip", sep=";", names=("eid","t","src","spd"))
    # I observe memory leak even if just reading the data.
    # As I add more processing steps more memory is leaked.
    ...
    df.to_parquet(...)


def main_dask():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    client = Client(LocalCluster(n_workers=40, threads_per_worker=1))
    results = set()

    for filepath in filepaths:
        if len(results) == 40:
            _, results = wait(results, return_when='FIRST_COMPLETED')

        job = client.submit(compute_profile_from_file, filepath)
        results.add(job)

    client.gather(results);
    del results
    
    time.sleep(24*3600)


def main_mp():
    fs = fsspec.filesystem("file")
    filepaths = fs.glob("/data/*.csv.gz")

    import multiprocessing as mp
    mp.set_start_method('spawn')

    pool = mp.Pool(40)
    pool.map(compute_profile_from_file, filepaths)

    time.sleep(24*3600)


if __name__ == "__main__":
    #main_dask()
    #main_mp()

@jrdzha
Copy link

jrdzha commented Feb 2, 2021

Any update here? We are also trying to use Dask in production, but this is causing some major issues for us.

@TomAugspurger
Copy link
Member Author

This issue, and probably a few others scattered across dask / distributed, should have the current state of things.

FWIW, I'm not able to reproduce my original issue now, at least not on a different machine. Previous attempts were on macOS, but on Linux under WSL2 I see

Memory usage [before]
{'tcp://127.0.0.1:33421': '54.06 MB', 'tcp://127.0.0.1:40071': '56.51 MB'}
running
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.10 MB', 'tcp://127.0.0.1:40071': '56.91 MB'}
Memory usage [after]
{'tcp://127.0.0.1:33421': '55.50 MB', 'tcp://127.0.0.1:40071': '57.72 MB'}
clear things?
Memory usage [final]
{'tcp://127.0.0.1:33421': '65.99 MB', 'tcp://127.0.0.1:40071': '68.57 MB'}

Previously the [after] was in the hundreds of megabytes.


I also learned about sys._debugmallocstats(), which might be worth looking into if this is a memory fragmentation issue.

@rolo-ml
Copy link

rolo-ml commented Mar 20, 2021

Is there a simple, easy, effective way to kill all dask processes/workers hogging up memory, whether via cmd line or directly in Python? I thought that it's automatically done upon completion of a python cmd line call or interrupting the execution, but I guess not. I executed these set of commands several times in VS code while debugging and wasn't aware that memory wasn't being freed up on every iteration. Now 73% of the RAM is blocked and I have no idea how to free it. Can someone please help?

COMMANDS:
import dask.dataframe as dd
df = dd.read_sql_table(parameterdict["table"],connstring,index_col='payee',columns=parameterdict["selectlist"])
df = df.compute()

MY CONFIGURATION:

  • Azure Data Science Virtual Machine, DS13 v2 instance (so it's not a cluster, just a single node I guess)
  • 56GB RAM
  • I'm using VS code on Python 3.6 with Dask 2.19 managed by Conda

Please help?

image

@mangecoeur
Copy link

mangecoeur commented Mar 24, 2021

Just to chime in that I'm having a similar issue in long-running jobs (a very big job made of many small tasks). Worker logs indicate that something like 'memory full but no data to spill to disc'. It's very hard to diagnose because it only really has an impact after many hours - a 7-hour job completed but a 14 hour one got stuck as all the workers' memory filled so they stopped accepting or processing tasks, but since there's nothing to spill to disc they couldn't free their memory either.

Interestingly I first had the problem appear much sooner when i was using Numba to accelerate my algorithm. Then I switched to Cython instead and it seemed to cure the problem, but when I ran a much bigger job the problem still appeared :( Whatever it is must still be happening but much more slowly.

@gjoseph92
Copy link
Collaborator

Just happened to notice this issue, which is still open and has a number of up-votes. I wonder if this is an instance of memory not being released back to the OS, which now has its own section in the docs? Additionally, Nannies now set the MALLOC_TRIM_THRESHOLD_ environment variable before startup, so Linux users should see more more realistic memory measurement automatically.

I'm wondering if we should close this now?

cc @crusaderky xref #6681 #6780 #4874

@dmcgoldrick
Copy link

I am running jobs on a fairly fragile SBC cluster using DASK SSH and have been playing around with memory tricks a little...

  1. upfront job management: i use dataframeswith efficient types and only client.map(f,df_list) over chunks smaller than the SBC worker board's allowed memory (so calling df.memory_usage()) I optimize types (for example category is smaller than object types) and determine df.memory_usage().sum() on the dataframe if you can use that then I keep chunks under 1GB which is the SBC limit for this one)
  2. chunking up the dataframe quadrants and using them as separate jobs : (or use dask dataframes from disk and map the partitions but I don't like to write to SSD if i can avoid it, due to limited lifetime of SSDs) but here I can hold the entire dataframe on a larger memory machine and then send chunks over to the SBC cluster and join back etc - so I can engineer all the memory so that it never goes over the limit of the worker and I know the rough size of the chunked jobs and the join. I can parallel join the same way and reduce join the last frames on the 1 large memory machine
  3. you can call cancel() to flush batches of mapped-futures and release their memory (after they are gathered) before the next chunk on a board this hard-releases their prior chunk memory on the worker. using client.cancel(df) see https://distributed.dask.org/en/stable/memory.html

for example:

futures = notebook_client.map(function, list)

grabs the results returned in a df_list

df_list=notebook_client.gather(futures)

force clear memory for the completed futures on the SBC

[x.cancel() for x in futures]

  1. I start my scheduler and workers independently so I can restart the from the master scheduler a start_worker shell or you can use client.restart() to kill the worker processes and reclaim the memory all at once or sequentially so the cluster never goes down

Hopefully, I never send a job that exceeds memory and know that limit, force/check clearing worker memory after the worker reports back as part of the submission, sequentually restart the clients if they accumulate memory I don't understand.

are there any fork bombs out there we don't know about or any more ideas I missed? I'm sure experts can find the leaks?

So far I haven't broken anything quite in a while - all the best, cheers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests