Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

to_netcdf() doesn't work with multiprocessing scheduler #3781

Open
bcbnz opened this issue Feb 19, 2020 · 4 comments
Open

to_netcdf() doesn't work with multiprocessing scheduler #3781

bcbnz opened this issue Feb 19, 2020 · 4 comments

Comments

@bcbnz
Copy link
Contributor

bcbnz commented Feb 19, 2020

If I create a chunked lazily-computed array, writing it to disk with to_netcdf() computes and writes it with the threading and distributed schedulers, but not with the multiprocessing scheduler. The only reference I've found when searching for the exception message comes from this StackOverflow question.

MCVE Code Sample

import dask
import numpy as np
import xarray as xr

if __name__ == "__main__":
    # Simple worker function.
    def inner(ds):
        if sum(ds.dims.values()) == 0:
            return ds
        return ds**2

    # Some random data to work with.
    ds = xr.Dataset(
            {"test": (("a", "b"), np.random.uniform(size=(1000, 1000)))},
            {"a": np.arange(1000), "b": np.arange(1000)}
    )

    # Chunk it and apply the worker to each chunk.
    ds_chunked = ds.chunk({"a": 100, "b": 200})
    ds_squared = ds_chunked.map_blocks(inner)

    # Thread pool scheduler can compute while writing.
    dask.config.set(scheduler="threads")
    print("Writing thread pool test to disk.")
    ds_squared.to_netcdf("test-threads.nc")

    # Local cluster with distributed works too.
    c = dask.distributed.Client()
    dask.config.set(scheduler=c)
    print("Writing local cluster test to disk.")
    ds_squared.to_netcdf("test-localcluster.nc")

    # Process pool scheduler can compute.
    dask.config.set(scheduler="processes")
    print("Computing with process pool scheduler.")
    ds_squared.compute()

    # But it cannot compute while writing.
    print("Trying to write process pool test to disk.")
    ds_squared.to_netcdf("test-process.nc")

Expected Output

Complete netCDF files should be created from all three schedulers.

Problem Description

The thread pool and distributed local cluster schedulers result in a complete output. The process pool scheduler fails when trying to write (note that test-process.nc is created with the header and coordinate information, but no actual data is written). The traceback is:

Traceback (most recent call last):
  File "bug.py", line 54, in <module>
    ds_squared.to_netcdf("test-process.nc")
  File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
    return to_netcdf(
  File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1097, in to_netcdf
    writes = writer.sync(compute=compute)
  File "/usr/lib/python3.8/site-packages/xarray/backends/common.py", line 198, in sync
    delayed_store = da.store(
  File "/usr/lib/python3.8/site-packages/dask/array/core.py", line 923, in store
    result.compute(**kwargs)
  File "/usr/lib/python3.8/site-packages/dask/base.py", line 165, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/lib/python3.8/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/lib/python3.8/site-packages/dask/multiprocessing.py", line 212, in get
    result = get_async(
  File "/usr/lib/python3.8/site-packages/dask/local.py", line 494, in get_async
    fire_task()
  File "/usr/lib/python3.8/site-packages/dask/local.py", line 460, in fire_task
    dumps((dsk[key], data)),
  File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 62, in dumps
    cp.dump(obj)
  File "/usr/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py", line 538, in dump
    return Pickler.dump(self, obj)
  File "/usr/lib/python3.8/multiprocessing/synchronize.py", line 101, in __getstate__
    context.assert_spawning(self)
  File "/usr/lib/python3.8/multiprocessing/context.py", line 363, in assert_spawning
    raise err
RuntimeError: Lock objects should only be shared between processes through inheritance

With a bit of editing of the system multiprocessing module I was able to determine that the lock being reported by this exception was the first lock created. I then added a breakpoint to the Lock constructor to get a traceback of what was creating it:

File Line Function
core/dataset.py 1535 Dataset.to_netcdf
backends/api.py 1071 to_netcdf
backends/netCDF4_.py 350 open
backends/locks.py 114 get_write_lock
backends/locks.py 39 _get_multiprocessing_lock

This last function creates the offending multiprocessing.Lock() object. Note that there are six Locks constructed and so its possible that the later-created ones would also cause an issue.

The h5netcdf backend has the same problem with Lock. However the SciPy backend gives a NotImplementedError for this:

ds_squared.to_netcdf("test-process.nc", engine="scipy")
Traceback (most recent call last):
  File "bug.py", line 54, in <module>
    ds_squared.to_netcdf("test-process.nc", engine="scipy")
  File "/usr/lib/python3.8/site-packages/xarray/core/dataset.py", line 1535, in to_netcdf
    return to_netcdf(
  File "/usr/lib/python3.8/site-packages/xarray/backends/api.py", line 1056, in to_netcdf
    raise NotImplementedError(
NotImplementedError: Writing netCDF files with the scipy backend is not currently supported with dask's multiprocessing scheduler

I'm not sure how simple it would be to get this working with the multiprocessing scheduler, or how vital it is given that the distributed scheduler works. If nothing else, it would be good to get the same NotImplementedError as with the SciPy backend.

Output of xr.show_versions()

commit: None
python: 3.8.1 (default, Jan 22 2020, 06:38:00)
[GCC 9.2.0]
python-bits: 64
OS: Linux
OS-release: 5.5.4-arch1-1
machine: x86_64
processor:
byteorder: little
LC_ALL: None
LANG: en_NZ.UTF-8
LOCALE: en_NZ.UTF-8
libhdf5: 1.10.5
libnetcdf: 4.7.3

xarray: 0.15.0
pandas: 1.0.1
numpy: 1.18.1
scipy: 1.4.1
netCDF4: 1.5.3
pydap: None
h5netcdf: 0.7.4
h5py: 2.10.0
Nio: None
zarr: None
cftime: 1.1.0
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.10.1
distributed: 2.10.0
matplotlib: 3.1.3
cartopy: 0.17.0
seaborn: None
numbagg: None
setuptools: 45.2.0
pip: 19.3
conda: None
pytest: 5.3.5
IPython: 7.12.0
sphinx: 2.4.2

@lvankampenhout
Copy link

lvankampenhout commented Oct 1, 2020

I think I ran into a similar problem when combining dask-chunked DataSets (originating from open_mfdataset) with Python's native multiprocessing package. I get no error message, and the headers of the files are created, but then the script hangs indefinitely. The use case is: combining and resampling of variables into ~1000 different NetCDF files, which I want to distribute over different processes using multiprocessing.

MCVE Code Sample

import xarray as xr
from multiprocessing import Pool
import os

if (False): 
    """
    Load data without using dask
    """
    ds = xr.open_dataset("http://www.esrl.noaa.gov/psd/thredds/dodsC/Datasets/ncep.reanalysis/surface/air.sig995.1960.nc")
else:
    """
    Load data using dask
    """
    ds = xr.open_dataset("http://www.esrl.noaa.gov/psd/thredds/dodsC/Datasets/ncep.reanalysis/surface/air.sig995.1960.nc", chunks={})

print(ds.nbytes / 1e6, 'MB')

print('chunks', ds.air.chunks) # chunks is empty without dask

outdir = '/glade/scratch/lvank' # change this to some temporary directory on your system

def do_work(n):
    print(n)
    ds.to_netcdf(os.path.join(outdir, f'{n}.nc'))

tasks = range(10)

with Pool(processes=2) as pool:
    pool.map(do_work, tasks)
    
print('done')

Expected Output
The NetCDF copies in outdir named 0.nc to 9.nc should be created for both cases (with and without Dask).

Problem Description
In the case with Dask, when the if-statement evaluates to False, the files are not created and the program hangs.

Output of xr.show_versions()

INSTALLED VERSIONS

commit: None
python: 3.8.5 (default, Sep 4 2020, 07:30:14)
[GCC 7.3.0]
python-bits: 64
OS: Linux
OS-release: 3.10.0-1127.13.1.el7.x86_64
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: en_US.UTF-8
LANG: en_US.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.4
libnetcdf: 4.7.3

xarray: 0.16.1
pandas: 1.1.1
numpy: 1.19.1
scipy: 1.5.2
netCDF4: 1.5.3
pydap: None
h5netcdf: None
h5py: None
Nio: None
zarr: None
cftime: 1.2.1
nc_time_axis: None
PseudoNetCDF: None
rasterio: None
cfgrib: None
iris: None
bottleneck: None
dask: 2.27.0
distributed: 2.28.0
matplotlib: 3.3.1
cartopy: None
seaborn: None
numbagg: None
pint: None
setuptools: 49.6.0.post20200925
pip: 20.2.2
conda: None
pytest: None
IPython: 7.18.1
sphinx: None

</details>

@Chrismarsh
Copy link

I am also hitting the problem as described by @bcbnz

@tsupinie
Copy link

tsupinie commented Feb 9, 2021

@lvankampenhout, I ran into your problem. OP's seems like it's actually in to_netcdf(), but I think yours (ours) is in Dask's lazy loading and therefore unrelated.

In short, ds will have some Dask arrays whose contents don't actually get loaded until you call to_netcdf(). By default, Dask loads in parallel, and the default Dask parallel scheduler chokes when you do your own parallelism on top. In my case, I was able to get around it by doing

ds.load(scheduler='sync')

at some point. If it's outside do_work(), I think you can skip the scheduler='sync' part, but inside do_work(), it's required. This bypasses the parallelism in Dask, which is probably what you want if you're doing your own parallelism.

@cjauvin
Copy link
Contributor

cjauvin commented Sep 25, 2021

I'm currently studying this problem in depth and I noticed that while the threaded scheduler uses a lock that is defined in function of the file name (as the key):

_FILE_LOCKS: MutableMapping[Any, threading.Lock] = weakref.WeakValueDictionary()
def _get_threaded_lock(key):
try:
lock = _FILE_LOCKS[key]
except KeyError:
lock = _FILE_LOCKS[key] = threading.Lock()
return lock

the process-based scheduler throws away the key:

def _get_multiprocessing_lock(key):
# TODO: make use of the key -- maybe use locket.py?
# https://github.com/mwilliamson/locket.py
del key # unused
return multiprocessing.Lock()

I'm not sure yet what are the consequences and logical interpretation of that, but I would like to reraise @bcbnz's question above: should this scenario simply raise a NotImplemented error because it cannot be supported?

carueda added a commit to mbari-org/pbp that referenced this issue Oct 22, 2023
allows skipping the usual .nc generation behavior in case caller wants better control for file creation, eg. after a  multi-day parallel processing

see pydata/xarray#3781
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants