Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resources annotations for "array creation" steps are ignored #4491

Open
phildong opened this issue Feb 8, 2021 · 3 comments
Open

resources annotations for "array creation" steps are ignored #4491

phildong opened this issue Feb 8, 2021 · 3 comments

Comments

@phildong
Copy link

phildong commented Feb 8, 2021

What happened

So I was very excited about the new annotation feature and want to see whether it improves the performance of my workflow.
Basically I have an array that's loaded with a very memory-consuming function, into big chunks initially. Then I want to split it into smaller chunks and store them to zarr. Something like the following:

import dask.array as darr
arr = darr.random.random((1e5, 200, 200), chunks=(1e3, 200, 200))
arr = arr.rechunk((100, -1, -1))
arr.to_zarr(...)

where the initial random sampling is equivalent to my memory-consuming array creation function.

I want this to run on a memory-limited environment but also take advantage of as many cpu as possible. So my idea is to limit the number of array creation tasks that can be executed simultaneously on a worker, and the new resources annotation seems perfect for this job. So I have tried:

approach 1: annotating the "array creation" steps

According to the documentation the following seem to be the best way to go and should work:

import dask as da
from dask import array as darr
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=1,
    memory_limit="8GB",
    threads_per_worker=16,
    resources={"MEM": 4},
)
client = Client(cluster)

with da.annotate(resources={'MEM': 1}):
    arr = darr.random.random((1e5, 200, 200), chunks=(1e3, 200, 200))
    arr = arr.rechunk((100, -1, -1))
with da.config.set(array_optimize=None):
    arr.to_zarr("./arr.zarr", overwrite=True)

However it seems the annotations are not respected. I can see 16 "random_sample" concurrent operations on my dashboard instead of 4, and the worker quickly gets killed due to memory overload.

approach 2: annotating the storing steps

import dask as da
from dask import array as darr
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=1,
    memory_limit="8GB",
    threads_per_worker=16,
    resources={"MEM": 4},
)
client = Client(cluster)

arr = darr.random.random((1e5, 200, 200), chunks=(1e3, 200, 200))
arr = arr.rechunk((100, -1, -1))
with da.annotate(resources={'MEM': 1}):
    arr.to_zarr("./arr.zarr", overwrite=True)

This works sort of as expected, where basically at any given time I only have 4 threads "active" regardless of the tasks. However this is not ideal since I only want the initial "random sample" task to be limited. Moreover, this seem to change the scheduler behavior -- all the "random sample" tasks are carried out before any "store" could happen, hence the scheduler are constantly read/write to disk to cache the results of random samples. It seems this should not happen in normal cases according to the task order documentation and I expect the "store" to happen immediately after the corresponding dependencies finished.

approach 3: hard-limit the number of threads

import dask as da
from dask import array as darr
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(
    n_workers=1,
    memory_limit="8GB",
    threads_per_worker=4, #reduced from 16 to 4
)
client = Client(cluster)

arr = darr.random.random((1e5, 200, 200), chunks=(1e3, 200, 200))
arr = arr.rechunk((100, -1, -1))
arr.to_zarr("./arr.zarr", overwrite=True)

This works as expected and the task orders seem correct as well. However this is not ideal since I want to be specific about which tasks should be limited due to memory footprint.

What I expect

I expect approach 1 to work and/or approach 2 to behave "normally" in terms of task orders.

Environment

  • Dask version: 2021.2.0
  • Python version: 3.8.1
  • Operating System: linux-64, ubuntu/18.04.4
  • Install method (conda, pip, source): conda
@jakirkham
Copy link
Member

cc @sjperkins (in case you have thoughts here 🙂)

@sjperkins
Copy link
Member

Unfortunately, task fusion needs to be disabled, otherwise annotations are destroyed:

with dask.config.set(optimization__fuse__active=False):
    result.compute() 

It only needs to be disabled when the compute step is called -- it doesn't change any behaviour during graph construction.

@gjoseph92
Copy link
Collaborator

Just noting this is essentially a duplicate of dask/dask#7036

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants