diff --git a/ci/requirements-py27-netcdf4-dev.yml b/ci/requirements-py27-netcdf4-dev.yml index 4ce193a2a82..85c5dcbf34d 100644 --- a/ci/requirements-py27-netcdf4-dev.yml +++ b/ci/requirements-py27-netcdf4-dev.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=2.7 - cython diff --git a/ci/requirements-py27-pydap.yml b/ci/requirements-py27-pydap.yml index e391eee514f..e8d4c3088ed 100644 --- a/ci/requirements-py27-pydap.yml +++ b/ci/requirements-py27-pydap.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=2.7 - dask diff --git a/ci/requirements-py35.yml b/ci/requirements-py35.yml index c6641598fca..df69d89c520 100644 --- a/ci/requirements-py35.yml +++ b/ci/requirements-py35.yml @@ -1,4 +1,6 @@ name: test_env +channels: + - conda-forge dependencies: - python=3.5 - cython diff --git a/doc/dask.rst b/doc/dask.rst index fc2361f5947..5c992ee7ff7 100644 --- a/doc/dask.rst +++ b/doc/dask.rst @@ -225,7 +225,7 @@ larger chunksizes. import os os.remove('example-data.nc') - + Optimization Tips ----------------- diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 2a71f362478..c4211f8eb32 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -62,7 +62,7 @@ Breaking changes By `Guido Imperiale `_ and `Stephan Hoyer `_. - Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer - caches its values into memory before pickling :issue:`1128`. Instead, pickle + caches its values into memory before pickling (:issue:`1128`). Instead, pickle stores file paths and restores objects by reopening file references. This enables preliminary, experimental use of xarray for opening files with `dask.distributed `_. @@ -227,6 +227,10 @@ Bug fixes - Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`). By `Fabien Maussion `_. +- Resolved a concurrency bug that could cause Python to crash when + simultaneously reading and writing netCDF4 files with dask (:issue:`1172`). + By `Stephan Hoyer `_. + - Fix to make ``.copy()`` actually copy dask arrays, which will be relevant for future releases of dask in which dask arrays will be mutable (:issue:`1180`). diff --git a/xarray/backends/api.py b/xarray/backends/api.py index bc2afa4b373..c69fc63acec 100644 --- a/xarray/backends/api.py +++ b/xarray/backends/api.py @@ -3,7 +3,6 @@ from __future__ import print_function import gzip import os.path -import threading from distutils.version import StrictVersion from glob import glob from io import BytesIO @@ -12,7 +11,7 @@ import numpy as np from .. import backends, conventions -from .common import ArrayWriter +from .common import ArrayWriter, GLOBAL_LOCK from ..core import indexing from ..core.combine import auto_combine from ..core.utils import close_on_error, is_remote_uri @@ -55,9 +54,6 @@ def _normalize_path(path): return os.path.abspath(os.path.expanduser(path)) -_global_lock = threading.Lock() - - def _default_lock(filename, engine): if filename.endswith('.gz'): lock = False @@ -71,9 +67,9 @@ def _default_lock(filename, engine): else: # TODO: identify netcdf3 files and don't use the global lock # for them - lock = _global_lock + lock = GLOBAL_LOCK elif engine in {'h5netcdf', 'pynio'}: - lock = _global_lock + lock = GLOBAL_LOCK else: lock = False return lock diff --git a/xarray/backends/common.py b/xarray/backends/common.py index 27291e65e3a..a7cce03a33a 100644 --- a/xarray/backends/common.py +++ b/xarray/backends/common.py @@ -2,7 +2,6 @@ from __future__ import division from __future__ import print_function import numpy as np -import itertools import logging import time import traceback @@ -12,7 +11,12 @@ from ..conventions import cf_encoder from ..core.utils import FrozenOrderedDict -from ..core.pycompat import iteritems, dask_array_type, OrderedDict +from ..core.pycompat import iteritems, dask_array_type + +try: + from dask.utils import SerializableLock as Lock +except ImportError: + from threading import Lock # Create a logger object, but don't add any handlers. Leave that to user code. logger = logging.getLogger(__name__) @@ -21,6 +25,10 @@ NONE_VAR_NAME = '__values__' +# dask.utils.SerializableLock if available, otherwise just a threading.Lock +GLOBAL_LOCK = Lock() + + def _encode_variable_name(name): if name is None: name = NONE_VAR_NAME @@ -150,7 +158,7 @@ def sync(self): import dask.array as da import dask if StrictVersion(dask.__version__) > StrictVersion('0.8.1'): - da.store(self.sources, self.targets, lock=threading.Lock()) + da.store(self.sources, self.targets, lock=GLOBAL_LOCK) else: da.store(self.sources, self.targets) self.sources = [] diff --git a/xarray/test/test_backends.py b/xarray/test/test_backends.py index 8c1e00a698c..c7867dc6d3a 100644 --- a/xarray/test/test_backends.py +++ b/xarray/test/test_backends.py @@ -1034,20 +1034,6 @@ def preprocess(ds): with open_mfdataset(tmp, preprocess=preprocess) as actual: self.assertDatasetIdentical(expected, actual) - def test_lock(self): - original = Dataset({'foo': ('x', np.random.randn(10))}) - with create_tmp_file() as tmp: - original.to_netcdf(tmp, format='NETCDF3_CLASSIC') - with open_dataset(tmp, chunks=10) as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertIsInstance(task[-1], type(Lock())) - with open_mfdataset(tmp) as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertIsInstance(task[-1], type(Lock())) - with open_mfdataset(tmp, engine='scipy') as ds: - task = ds.foo.data.dask[ds.foo.data.name, 0] - self.assertNotIsInstance(task[-1], type(Lock())) - def test_save_mfdataset_roundtrip(self): original = Dataset({'foo': ('x', np.random.randn(10))}) datasets = [original.isel(x=slice(5)), diff --git a/xarray/test/test_distributed.py b/xarray/test/test_distributed.py index a807f72387a..2ab7b0e2ffc 100644 --- a/xarray/test/test_distributed.py +++ b/xarray/test/test_distributed.py @@ -28,9 +28,7 @@ def test_dask_distributed_integration_test(loop, engine): original = create_test_data() with create_tmp_file() as filename: original.to_netcdf(filename, engine=engine) - # TODO: should be able to serialize locks - restored = xr.open_dataset(filename, chunks=3, lock=False, - engine=engine) + restored = xr.open_dataset(filename, chunks=3, engine=engine) assert isinstance(restored.var1.data, da.Array) computed = restored.compute() assert_dataset_allclose(original, computed)