-
I've almost exclusively used & contributed to xarray for in-memory workloads, and I've just started using it more seriously with dask, in an effort to see if it can replace spark for some areas. It's a bit harder than I expected. While it's mostly working now, I'm not sure I'm doing the correct thing, and I've blown up my memory or had to restart the dask scheduler many times. I'm a bit nervous that others with more spark vs dask/xarray experience may struggle with some of the debugging. I think part of the improvement would be to calibrate the dask settings, but part may also be I'm not following best-practices in xarray. So, here's the case:
So far my approach has been, conceptually, to a) run each function with dask distributed, writing each array to a distributed filesystem in a format that xarray+dask can read and b) read each array into a dask-backed and concat them into a single dataset, and c) write this whole dataset in dask as a single object which can later be read. I've put a full example below. While it's not exactly minimal, hopefully the fullness of the examples will make up for it (and the question here is the overall approach, so in this case I think does require the narrative). Run each function & write the resultToy function for the example: import xarray as xr
import numpy as np
def func(v):
return xr.Dataset(
dict(
a=xr.DataArray(np.ones((5000, 5000)), dims=["x", "y"]) * v,
b=xr.DataArray(np.ones((5000, 5000)), dims=["x", "y"]) * v * -1,
),
coords=dict(v=[v]),
) A function to pull an array and write it to a zarr file: DATA_PATH = "/mnt/dask-test/"
def write_zarr(v):
ds = func(v)
ds.chunk(dict(x=-1)).to_zarr(
f"{DATA_PATH}/chunks/{v}.zarr", mode="w", consolidated=True
) Then run this over all 10K values: import dask
import socket
client = dask.Client(f"tcp://{socket.gethostname()}:8786")
tasks = [dask.delayed(func)(v) for v in range(10_000)]
futures = client.compute(tasks) Read the arrays into dask-backed arrays and concat them into a single datasetEither (a) in a single call: ds = xr.open_mfdataset(f"{DATA_PATH}/chunks/*.zarr", engine="zarr", chunks="auto") or (b) in a loop: dss = {}
for v in range(10_000):
dss[v] = xr.open_zarr(
f"{DATA_PATH}/chunks/{v}.zarr", consolidated=True, chunks="auto"
).persist()
ds = xr.concat(list(dss.values()), dim="v", data_vars=["a", "b"])
Write this whole dataset in dask as a single objectfuture = ds.to_zarr(
f"{DATA_PATH}/single/X.zarr", mode="w", consolidated=True, compute=False
)
client.compute(future) ...and as a test, read it back out again... ds = xr.open_zarr(f"{DATA_PATH}/single/X.zarr") ...and it works! But: ds.persist() ...fails with a memory error, after attempting to bring the whole dataset into memory in the dask scheduler on my local machine. (It also sometimes failed with a Salient questions
I realize this is very long — any help with a part of this is greatly appreciated; definitely no need to respond to them all. Other links:
Versions:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
I made some progress here which significantly narrows the problem:
The main outstanding questions are:
|
Beta Was this translation helpful? Give feedback.
-
One persistent dask problem is that when read and write tasks are decoupled (as in your case) dask tends to run a lot of read tasks at once resulting in large memory use. The only consistent solution I know is to couple them together, by for e.g. sticking the
On your local machine, this is basically the same as
Yes. It will raise on both
Not sure what this means. DO you have a small example?
Yeah I think |
Beta Was this translation helpful? Give feedback.
One persistent dask problem is that when read and write tasks are decoupled (as in your case) dask tends to run a lot of read tasks at once resulting in large memory use. The only consistent solution I know is to couple them together, by for e.g. sticking the
to_zarr
bit in the delayed function.On your local machine, this is basically the same as
.compute
in that everything gets loaded in to local memory. On a distributed cluster,.persist
loads into distributed RAM.