Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to shared Lock (SerializableLock if possible) for reading/writing #1179

Merged
merged 6 commits into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/requirements-py27-netcdf4-dev.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=2.7
- cython
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-py27-pydap.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=2.7
- dask
Expand Down
2 changes: 2 additions & 0 deletions ci/requirements-py35.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
name: test_env
channels:
- conda-forge
dependencies:
- python=3.5
- cython
Expand Down
2 changes: 1 addition & 1 deletion doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ larger chunksizes.

import os
os.remove('example-data.nc')

Optimization Tips
-----------------

Expand Down
6 changes: 5 additions & 1 deletion doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Breaking changes
By `Guido Imperiale <https://github.com/crusaderky>`_ and
`Stephan Hoyer <https://github.com/shoyer>`_.
- Pickling a ``Dataset`` or ``DataArray`` linked to a file on disk no longer
caches its values into memory before pickling :issue:`1128`. Instead, pickle
caches its values into memory before pickling (:issue:`1128`). Instead, pickle
stores file paths and restores objects by reopening file references. This
enables preliminary, experimental use of xarray for opening files with
`dask.distributed <https://distributed.readthedocs.io>`_.
Expand Down Expand Up @@ -227,6 +227,10 @@ Bug fixes
- Fixed a bug with facetgrid (the ``norm`` keyword was ignored, :issue:`1159`).
By `Fabien Maussion <https://github.com/fmaussion>`_.

- Resolved a concurrency bug that could cause Python to crash when
simultaneously reading and writing netCDF4 files with dask (:issue:`1172`).
By `Stephan Hoyer <https://github.com/shoyer>`_.

- Fix to make ``.copy()`` actually copy dask arrays, which will be relevant for
future releases of dask in which dask arrays will be mutable (:issue:`1180`).

Expand Down
10 changes: 3 additions & 7 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function
import gzip
import os.path
import threading
from distutils.version import StrictVersion
from glob import glob
from io import BytesIO
Expand All @@ -12,7 +11,7 @@
import numpy as np

from .. import backends, conventions
from .common import ArrayWriter
from .common import ArrayWriter, GLOBAL_LOCK
from ..core import indexing
from ..core.combine import auto_combine
from ..core.utils import close_on_error, is_remote_uri
Expand Down Expand Up @@ -55,9 +54,6 @@ def _normalize_path(path):
return os.path.abspath(os.path.expanduser(path))


_global_lock = threading.Lock()


def _default_lock(filename, engine):
if filename.endswith('.gz'):
lock = False
Expand All @@ -71,9 +67,9 @@ def _default_lock(filename, engine):
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = _global_lock
lock = GLOBAL_LOCK
elif engine in {'h5netcdf', 'pynio'}:
lock = _global_lock
lock = GLOBAL_LOCK
else:
lock = False
return lock
Expand Down
14 changes: 11 additions & 3 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import division
from __future__ import print_function
import numpy as np
import itertools
import logging
import time
import traceback
Expand All @@ -12,7 +11,12 @@

from ..conventions import cf_encoder
from ..core.utils import FrozenOrderedDict
from ..core.pycompat import iteritems, dask_array_type, OrderedDict
from ..core.pycompat import iteritems, dask_array_type

try:
from dask.utils import SerializableLock as Lock
except ImportError:
from threading import Lock

# Create a logger object, but don't add any handlers. Leave that to user code.
logger = logging.getLogger(__name__)
Expand All @@ -21,6 +25,10 @@
NONE_VAR_NAME = '__values__'


# dask.utils.SerializableLock if available, otherwise just a threading.Lock
GLOBAL_LOCK = Lock()


def _encode_variable_name(name):
if name is None:
name = NONE_VAR_NAME
Expand Down Expand Up @@ -150,7 +158,7 @@ def sync(self):
import dask.array as da
import dask
if StrictVersion(dask.__version__) > StrictVersion('0.8.1'):
da.store(self.sources, self.targets, lock=threading.Lock())
da.store(self.sources, self.targets, lock=GLOBAL_LOCK)
else:
da.store(self.sources, self.targets)
self.sources = []
Expand Down
14 changes: 0 additions & 14 deletions xarray/test/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,20 +1034,6 @@ def preprocess(ds):
with open_mfdataset(tmp, preprocess=preprocess) as actual:
self.assertDatasetIdentical(expected, actual)

def test_lock(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
with create_tmp_file() as tmp:
original.to_netcdf(tmp, format='NETCDF3_CLASSIC')
with open_dataset(tmp, chunks=10) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp) as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertIsInstance(task[-1], type(Lock()))
with open_mfdataset(tmp, engine='scipy') as ds:
task = ds.foo.data.dask[ds.foo.data.name, 0]
self.assertNotIsInstance(task[-1], type(Lock()))

def test_save_mfdataset_roundtrip(self):
original = Dataset({'foo': ('x', np.random.randn(10))})
datasets = [original.isel(x=slice(5)),
Expand Down
4 changes: 1 addition & 3 deletions xarray/test/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def test_dask_distributed_integration_test(loop, engine):
original = create_test_data()
with create_tmp_file() as filename:
original.to_netcdf(filename, engine=engine)
# TODO: should be able to serialize locks
restored = xr.open_dataset(filename, chunks=3, lock=False,
engine=engine)
restored = xr.open_dataset(filename, chunks=3, engine=engine)
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_dataset_allclose(original, computed)