Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed writes #1793

Merged
merged 40 commits into from
Mar 10, 2018
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
63abe7f
distributed tests that write dask arrays
Dec 17, 2017
1952173
Change zarr test to synchronous API
mrocklin Dec 19, 2017
dd4bfcf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Dec 21, 2017
5c7f94c
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 11, 2018
9e70a3a
initial go at __setitem__ on array wrappers
Jan 11, 2018
ec67a54
fixes for scipy
Jan 12, 2018
2a4faa4
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 24, 2018
5497ad1
cleanup after merging with upstream/master
Jan 25, 2018
c2f5bb8
needless duplication of tests to work around pytest bug
Jan 25, 2018
5344fe8
use netcdf_variable instead of get_array()
Jan 25, 2018
7cbd2e5
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 28, 2018
49366bf
use synchronous dask.distributed test harness
mrocklin Feb 2, 2018
323f17b
Merge branch 'master' into feature/distributed_writes
Feb 2, 2018
81f7610
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 2, 2018
199538e
cleanup tests
Feb 2, 2018
d2050e7
per scheduler locks and autoclose behavior for writes
Feb 3, 2018
76675de
HDF5_LOCK and CombinedLock
Feb 6, 2018
9ac0327
integration test for distributed locks
Feb 6, 2018
05e7d54
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 18, 2018
1672968
more tests and set isopen to false when pickling
Feb 18, 2018
a667615
Fixing style errors.
stickler-ci Feb 18, 2018
2c0a7e8
ds property on DataStorePickleMixin
Feb 19, 2018
6bcadfe
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 19, 2018
aba6bdc
stickler-ci
Feb 19, 2018
5702c67
compat fixes for other backends
Feb 19, 2018
a06b683
HDF5_USE_FILE_LOCKING = False in test_distributed
Feb 21, 2018
efe8308
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 21, 2018
6ef31aa
style fix
Feb 21, 2018
00156c3
update tests to only expect netcdf4 to work, docstrings, and some cle…
Feb 22, 2018
3dcfac5
Fixing style errors.
stickler-ci Feb 22, 2018
29edaa7
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 27, 2018
61ee5a8
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 27, 2018
91f3c6a
fix imports
Feb 27, 2018
5cb91ba
fix more import bugs
Feb 27, 2018
2b97d4f
update docs
Feb 27, 2018
2dc514f
fix for pynio
Feb 28, 2018
eff0161
cleanup locks and use pytest monkeypatch for environment variable
Feb 28, 2018
5290484
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 6, 2018
c855284
fix failing test using combined lock
Mar 8, 2018
3c2ffbf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ Bug fixes
- Compatibility fixes to plotting module for Numpy 1.14 and Pandas 0.22
(:issue:`1813`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Fixed to_netcdf when using dask distributed (:issue:`1464`).
By `Joe Hamman <https://github.com/jhamman>`_..
- Bug fix in encoding coordinates with ``{'_FillValue': None}`` in netCDF
metadata (:issue:`1865`).
By `Chris Roth <https://github.com/czr137>`_.
Expand Down
18 changes: 14 additions & 4 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np

from .. import backends, conventions, Dataset
from .common import ArrayWriter, GLOBAL_LOCK
from .common import ArrayWriter, HDF5_LOCK, CombinedLock
from ..core import indexing
from ..core.combine import auto_combine
from ..core.utils import close_on_error, is_remote_uri
Expand Down Expand Up @@ -66,9 +66,9 @@ def _default_lock(filename, engine):
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = GLOBAL_LOCK
lock = HDF5_LOCK
elif engine in {'h5netcdf', 'pynio'}:
lock = GLOBAL_LOCK
lock = HDF5_LOCK
else:
lock = False
return lock
Expand Down Expand Up @@ -622,8 +622,18 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
# if a writer is provided, store asynchronously
sync = writer is None

# TODO Move this logic outside of this function
from .common import get_scheduler, get_scheduler_lock
scheduler = get_scheduler()
# I think we want to include the filename here to support concurrent writes
# using save_mfdataset
file_lock = get_scheduler_lock(scheduler)(path_or_file)
lock = CombinedLock([HDF5_LOCK, file_lock])
autoclose = scheduler == 'distributed'

target = path_or_file if path_or_file is not None else BytesIO()
store = store_open(target, mode, format, group, writer)
store = store_open(target, mode, format, group, writer,
autoclose=autoclose, lock=lock)

if unlimited_dims is None:
unlimited_dims = dataset.encoding.get('unlimited_dims', None)
Expand Down
119 changes: 100 additions & 19 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
import contextlib
from collections import Mapping, OrderedDict
import warnings
import multiprocessing
import threading

from ..conventions import cf_encoder
from ..core import indexing
from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin
from ..core.pycompat import iteritems, dask_array_type

# Import default lock
try:
from dask.utils import SerializableLock as Lock
from dask.utils import SerializableLock
HDF5_LOCK = SerializableLock()
except ImportError:
from threading import Lock

HDF5_LOCK = threading.Lock()

# Create a logger object, but don't add any handlers. Leave that to user code.
logger = logging.getLogger(__name__)
Expand All @@ -27,8 +30,38 @@
NONE_VAR_NAME = '__values__'


# dask.utils.SerializableLock if available, otherwise just a threading.Lock
GLOBAL_LOCK = Lock()
def get_scheduler(get=None, collection=None):
try:
from dask.utils import effective_get
actual_get = effective_get(get, collection)
try:
from dask.distributed import Client
if isinstance(actual_get.__self__, Client):
return 'distributed'
except (ImportError, AttributeError):
try:
import dask.multiprocessing
if actual_get == dask.multiprocessing.get:
return 'multiprocessing'
else:
return 'threaded'
except ImportError:
return 'threaded'
except ImportError:
return None


def get_scheduler_lock(scheduler):
if scheduler == 'distributed':
from dask.distributed import Lock
return Lock
elif scheduler == 'multiprocessing':
return multiprocessing.Lock
elif scheduler == 'threaded':
from dask.utils import SerializableLock
return SerializableLock
else:
return threading.Lock


def _encode_variable_name(name):
Expand Down Expand Up @@ -77,6 +110,39 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,
time.sleep(1e-3 * next_delay)


class CombinedLock(object):
"""A combination of multiple locks.

Like a locked door, a CombinedLock is locked if any of its constituent
locks are locked.
"""

def __init__(self, locks):
self.locks = locks

def acquire(self, *args):
return all(lock.acquire(*args) for lock in self.locks)

def release(self, *args):
for lock in self.locks:
lock.release(*args)

def __enter__(self):
for lock in self.locks:
lock.__enter__()

def __exit__(self, *args):
for lock in self.locks:
lock.__exit__(*args)

@property
def locked(self):
return any(lock.locked for lock in self.locks)

def __repr__(self):
return "CombinedLock(%s)" % [repr(lock) for lock in self.locks]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you could equivalently substitute "CombinedLock(%r)" % list(self.locks) here.



class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed):

def __array__(self, dtype=None):
Expand All @@ -85,7 +151,9 @@ def __array__(self, dtype=None):


class AbstractDataStore(Mapping):
_autoclose = False
_autoclose = None
_ds = None
_isopen = False

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -168,7 +236,7 @@ def __exit__(self, exception_type, exception_value, traceback):


class ArrayWriter(object):
def __init__(self, lock=GLOBAL_LOCK):
def __init__(self, lock=HDF5_LOCK):
self.sources = []
self.targets = []
self.lock = lock
Expand All @@ -178,11 +246,7 @@ def add(self, source, target):
self.sources.append(source)
self.targets.append(target)
else:
try:
target[...] = source
except TypeError:
# workaround for GH: scipy/scipy#6880
target[:] = source
target[...] = source

def sync(self):
if self.sources:
Expand All @@ -193,9 +257,9 @@ def sync(self):


class AbstractWritableDataStore(AbstractDataStore):
def __init__(self, writer=None):
def __init__(self, writer=None, lock=HDF5_LOCK):
if writer is None:
writer = ArrayWriter()
writer = ArrayWriter(lock=lock)
self.writer = writer

def encode(self, variables, attributes):
Expand Down Expand Up @@ -239,6 +303,9 @@ def set_variable(self, k, v): # pragma: no cover
raise NotImplementedError

def sync(self):
if self._isopen and self._autoclose:
# datastore will be reopened during write
self.close()
self.writer.sync()

def store_dataset(self, dataset):
Expand Down Expand Up @@ -373,27 +440,41 @@ class DataStorePickleMixin(object):

def __getstate__(self):
state = self.__dict__.copy()
del state['ds']
del state['_ds']
del state['_isopen']
if self._mode == 'w':
# file has already been created, don't override when restoring
state['_mode'] = 'a'
return state

def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)
self._ds = None
self._isopen = False

@property
def ds(self):
if self._ds is not None and self._isopen:
return self._ds
ds = self._opener(mode=self._mode)
self._isopen = True
return ds

@contextlib.contextmanager
def ensure_open(self, autoclose):
def ensure_open(self, autoclose=None):
"""
Helper function to make sure datasets are closed and opened
at appropriate times to avoid too many open file errors.

Use requires `autoclose=True` argument to `open_mfdataset`.
"""
if self._autoclose and not self._isopen:

if autoclose is None:
autoclose = self._autoclose
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably use some additional thinking.


if not self._isopen:
try:
self.ds = self._opener()
self._ds = self._opener()
self._isopen = True
yield
finally:
Expand Down
14 changes: 9 additions & 5 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from ..core.utils import FrozenOrderedDict, close_on_error
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore, DataStorePickleMixin, find_root
from .common import (WritableCFDataStore, DataStorePickleMixin, find_root,
HDF5_LOCK)
from .netCDF4_ import (_nc4_group, _encode_nc4_variable, _get_datatype,
_extract_nc4_variable_encoding, BaseNetCDF4Array)

Expand Down Expand Up @@ -64,12 +65,12 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
"""

def __init__(self, filename, mode='r', format=None, group=None,
writer=None, autoclose=False):
writer=None, autoclose=False, lock=HDF5_LOCK):
if format not in [None, 'NETCDF4']:
raise ValueError('invalid format for h5netcdf backend')
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
self.ds = opener()
self._ds = opener()
if autoclose:
raise NotImplementedError('autoclose=True is not implemented '
'for the h5netcdf backend pending '
Expand All @@ -81,7 +82,7 @@ def __init__(self, filename, mode='r', format=None, group=None,
self._opener = opener
self._filename = filename
self._mode = mode
super(H5NetCDFStore, self).__init__(writer)
super(H5NetCDFStore, self).__init__(writer, lock=lock)

def open_store_variable(self, name, var):
with self.ensure_open(autoclose=False):
Expand Down Expand Up @@ -173,7 +174,10 @@ def prepare_variable(self, name, variable, check_encoding=False,

for k, v in iteritems(attrs):
nc4_var.setncattr(k, v)
return nc4_var, variable.data

target = H5NetCDFArrayWrapper(name, self)

return target, variable.data

def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
21 changes: 14 additions & 7 deletions xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ..core.pycompat import iteritems, basestring, OrderedDict, PY3, suppress

from .common import (WritableCFDataStore, robust_getitem, BackendArray,
DataStorePickleMixin, find_root)
DataStorePickleMixin, find_root, HDF5_LOCK)
from .netcdf3 import (encode_nc3_attr_value, encode_nc3_variable)

# This lookup table maps from dtype.byteorder to a readable endian
Expand All @@ -43,6 +43,11 @@ def __init__(self, variable_name, datastore):
dtype = np.dtype('O')
self.dtype = dtype

def __setitem__(self, key, value):
with self.datastore.ensure_open(autoclose=True):
data = self.get_array()
data[key] = value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoyer, is this what you were describing in #1464 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks right to me.

def get_array(self):
self.datastore.assert_open()
return self.datastore.ds.variables[self.variable_name]
Expand Down Expand Up @@ -231,14 +236,14 @@ class NetCDF4DataStore(WritableCFDataStore, DataStorePickleMixin):
"""

def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None,
autoclose=False):
autoclose=False, lock=HDF5_LOCK):

if autoclose and opener is None:
raise ValueError('autoclose requires an opener')

_disable_auto_decode_group(netcdf4_dataset)

self.ds = netcdf4_dataset
self._ds = netcdf4_dataset
self._autoclose = autoclose
self._isopen = True
self.format = self.ds.data_model
Expand All @@ -249,12 +254,12 @@ def __init__(self, netcdf4_dataset, mode='r', writer=None, opener=None,
self._opener = functools.partial(opener, mode=self._mode)
else:
self._opener = opener
super(NetCDF4DataStore, self).__init__(writer)
super(NetCDF4DataStore, self).__init__(writer, lock=lock)

@classmethod
def open(cls, filename, mode='r', format='NETCDF4', group=None,
writer=None, clobber=True, diskless=False, persist=False,
autoclose=False):
autoclose=False, lock=HDF5_LOCK):
import netCDF4 as nc4
if (len(filename) == 88 and
LooseVersion(nc4.__version__) < "1.3.1"):
Expand All @@ -274,7 +279,7 @@ def open(cls, filename, mode='r', format='NETCDF4', group=None,
format=format)
ds = opener()
return cls(ds, mode=mode, writer=writer, opener=opener,
autoclose=autoclose)
autoclose=autoclose, lock=lock)

def open_store_variable(self, name, var):
with self.ensure_open(autoclose=False):
Expand Down Expand Up @@ -398,7 +403,9 @@ def prepare_variable(self, name, variable, check_encoding=False,
# OrderedDict as the input to setncatts
nc4_var.setncattr(k, v)

return nc4_var, variable.data
target = NetCDF4ArrayWrapper(name, self)

return target, variable.data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhamman
This is too late, but I think nc4_var is never used. Is it correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fujiisoup - it is used in line 405 (nc4_var.setncattr(k, v)).


def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
Loading