-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix distributed writes #1793
fix distributed writes #1793
Conversation
yes, see #1464 (comment) |
The zarr test seems a bit different. I think your issue here is that you are trying to use synchronous API with the async test harness. I've changed your test and pushed to your branch (hope you don't mind). Relevant docs are here: http://distributed.readthedocs.io/en/latest/develop.html#writing-tests Async testing is nicer in many ways, but does require you to be a bit familiar with the async/tornado API. I also suspect that operations like |
with self.datastore.ensure_open(autoclose=True): | ||
data = self.get_array() | ||
data[key] = value | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@shoyer, is this what you were describing in #1464 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this looks right to me.
I have a test failing here with a familiar message.
We saw this last week when debugging some pangeo things. Can you remind me what our solution was? |
I don't know. I would want to look at the fail case locally. I can try to do this near term, no promises though :/ |
I've just taken another swing at this and come up empty. I open to ideas in the following areas:
The good news here is that only 8 tests are failing after applying the array wrapper so I suspect we're quite close. I'm hoping @shoyer may have some ideas on (1) since I think he had implemented some scipy workarounds in the past. @mrocklin, I'm hoping you can point me in the right direction. All of these tests are reproducible locally. (BTW, I have a use case that is going to need this functionality so I'm personally motivated to see it across the finish line) |
xarray/backends/scipy_.py
Outdated
@@ -55,6 +55,18 @@ def __getitem__(self, key): | |||
copy = self.datastore.ds.use_mmap | |||
return np.array(data, dtype=self.dtype, copy=copy) | |||
|
|||
def __setitem__(self, key, value): | |||
with self.datastore.ensure_open(autoclose=True): | |||
data = self.get_array() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be self.datastore.ds.variables[self.variable_name]
(a netcdf_variable
object), not self.get_array()
, which returns the .data
(a numpy array). You can't enlarge a numpy array, but can enlarge a scipy netcdf variable.
(This manifests itself in writing a netcdf file with a time dimension of length 0. Xarray then crashes when attempting to a decode a length 0 time variable, which is an unrelated bug.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @shoyer - this appears to fix the scipy issue.
Kudos for pushing this forward. I don't have much help to offer, but I wanted to recognize your effort...this is hard stuff!. |
Has anyone successfully used |
I can take a look at the future not iterable issue sometime tomorrow.
My guess is that this would be easy with a friendly storage target. I'm not sure though. cc @jakirkham who has been active on this topic recently. |
Yep, using Our cluster uses NFS for things like one's home directory. So these are accessible across nodes. Also there are other types of storage available that are a bit faster and still remain accessible across nodes. So these work pretty well. |
Yes, the zarr backend here in xarray is also using |
I have definitely used the distributed scheduler with But I cannot recall if I ever got it to work with netCDF. |
xref: #798 and dask/dask#2488 which are both seem to be relevant to this discussion. I'm also remembering @pwolfram was quite involved with the original distributed integration so pinging him to see if he is interested in this. |
Looking into this a little bit, this looks like a dask-distributed bug to me. Somehow |
if self._autoclose and not self._isopen: | ||
|
||
if autoclose is None: | ||
autoclose = self._autoclose |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could probably use some additional thinking.
…y into feature/distributed_writes
after merge
I've added some additional tests and cleaned up the implementation a bit. I'd like to get reviews from a few folks and hopefully get this merged later this week. |
xarray/backends/api.py
Outdated
|
||
# Question: Should we be dropping one of these two locks when they are they | ||
# are basically the same. For instance, when using netcdf4 and dask is not | ||
# installed, locks will be [threading.Lock(), threading.Lock()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is harmless, as long as these are different lock instances.
On the other hand, something like CombinedLock([lock, lock])
would never be satisfied because it's impossible to unlock a lock twice.
xarray/backends/api.py
Outdated
# per file lock | ||
# Dask locks take a name argument (e.g. filename) | ||
locks.append(SchedulerLock(path_or_file)) | ||
except TypeError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be less error prone to pass the name to get_scheduler_lock
and have it return a lock instance.
xarray/backends/common.py
Outdated
return any(lock.locked for lock in self.locks) | ||
|
||
def __repr__(self): | ||
return "CombinedLock(%s)" % [repr(lock) for lock in self.locks] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think you could equivalently substitute "CombinedLock(%r)" % list(self.locks)
here.
xarray/tests/test_distributed.py
Outdated
|
||
|
||
# Does this belong elsewhere? | ||
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to use a context manager or decorator on the test, something along the lines of https://stackoverflow.com/questions/2059482/python-temporarily-modify-the-current-processs-environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea. I think we can actually do this with pytest/monkeypatch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was pretty heavy duty! Nice work Joe!
doc/whats-new.rst
Outdated
@@ -38,6 +38,13 @@ Documentation | |||
Enhancements | |||
~~~~~~~~~~~~ | |||
|
|||
- Support for writing netCDF files from xarray datastores (scipy and netcdf4 only) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be "to xarray datastores"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind, I think it makes sense as is.
Maybe "Support for writing xarray datasets to netCDF files..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's nice to see you were able to get this to work with SciPy!
@@ -356,6 +356,8 @@ def prepare_variable(self, name, variable, check_encoding=False, | |||
|
|||
fill_value = _ensure_valid_fill_value(attrs.pop('_FillValue', None), | |||
dtype) | |||
if variable.encoding == {'_FillValue': None} and fill_value is None: | |||
variable.encoding = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this fix a specific issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, this crept in from #1869.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this fix #1955?
|
||
import pytest | ||
|
||
dask = pytest.importorskip('dask') # isort:skip | ||
distributed = pytest.importorskip('distributed') # isort:skip | ||
|
||
from dask import array | ||
from dask.distributed import Client, Lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything seems fine to me. What is dask._config.globals['get']
? Is it the get
method of a client? To debug you might consider giving each of your clients a name
with Client(s['address'], name='test-foo') as client
and then seeing which one isn't getting cleaned up? You can also try distributed.client.default_client()
. We clean things up in the __exit__()
call though, so as long as you're using context managers or @gen_cluster
everything should work fine.
""" | ||
|
||
def __init__(self, locks): | ||
self.locks = tuple(set(locks)) # remove duplicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
previous test failures were having trouble in __enter__
when iterating over a set
of locks. casting to list/tuple seems to have resolved that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh. I would if non-deterministic ordering of set iteration (e.g., after serialization/unserialization) contributed to that.
All the test are passing here. I would appreciate another round of reviews. @shoyer - all of your previous comments have been addressed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all looks good to me now.
Nice work tracking down everything that could go wrong here!
""" | ||
|
||
def __init__(self, locks): | ||
self.locks = tuple(set(locks)) # remove duplicates |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Huh. I would if non-deterministic ordering of set iteration (e.g., after serialization/unserialization) contributed to that.
Any final comments on this? If not, I'll probably merge this in the next day or two. |
return nc4_var, variable.data | ||
target = NetCDF4ArrayWrapper(name, self) | ||
|
||
return target, variable.data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jhamman
This is too late, but I think nc4_var
is never used. Is it correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fujiisoup - it is used in line 405 (nc4_var.setncattr(k, v)
).
git diff upstream/master **/*py | flake8 --diff
whats-new.rst
for all changes andapi.rst
for new APIRight now, I've just modified the dask distributed integration tests so we can all see the failing tests.
I'm happy to push this further but I thought I'd see if either @shoyer or @mrocklin have an idea where to start?