From 53c5199423a29854e3f9c1f5d6d3658b8ea95049 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 31 Oct 2019 09:52:11 -0600 Subject: [PATCH] __dask_tokenize__ (#3446) * Implement __dask_tokenize__ * Fix window test * Code review * Test change in IndexVariable --- doc/whats-new.rst | 20 ++++++-- xarray/core/dataarray.py | 3 ++ xarray/core/dataset.py | 3 ++ xarray/core/variable.py | 9 ++++ xarray/tests/test_dask.py | 94 +++++++++++++++++++++++++++++++++++++ xarray/tests/test_sparse.py | 22 ++++++++- 6 files changed, 146 insertions(+), 5 deletions(-) diff --git a/doc/whats-new.rst b/doc/whats-new.rst index 36ba0681ea2..47e2e58e988 100644 --- a/doc/whats-new.rst +++ b/doc/whats-new.rst @@ -21,17 +21,20 @@ v0.14.1 (unreleased) Breaking changes ~~~~~~~~~~~~~~~~ -- Minimum cftime version is now 1.0.3. By `Deepak Cherian `_. +- Broken compatibility with cftime < 1.0.3. + By `Deepak Cherian `_. .. note:: - cftime version 1.0.4 is broken (`cftime/126 `_), use version 1.0.4.2 instead. + cftime version 1.0.4 is broken + (`cftime/126 `_); + please use version 1.0.4.2 instead. - All leftover support for dates from non-standard calendars through netcdftime, the module included in versions of netCDF4 prior to 1.4 that eventually became the cftime package, has been removed in favor of relying solely on the standalone - cftime package (:pull:`3450`). By `Spencer Clark - `_. + cftime package (:pull:`3450`). + By `Spencer Clark `_. New Features ~~~~~~~~~~~~ @@ -52,6 +55,14 @@ New Features for now. Enable it with :py:meth:`xarray.set_options(display_style="html")`. (:pull:`3425`) by `Benoit Bovy `_ and `Julia Signell `_. +- Implement `dask deterministic hashing + `_ + for xarray objects. Note that xarray objects with a dask.array backend already used + deterministic hashing in previous releases; this change implements it when whole + xarray objects are embedded in a dask graph, e.g. when :meth:`DataArray.map` is + invoked. (:issue:`3378`, :pull:`3446`) + By `Deepak Cherian `_ and + `Guido Imperiale `_. Bug fixes ~~~~~~~~~ @@ -96,6 +107,7 @@ Internal Changes - Use Python 3.6 idioms throughout the codebase. (:pull:3419) By `Maximilian Roos `_ + .. _whats-new.0.14.0: v0.14.0 (14 Oct 2019) diff --git a/xarray/core/dataarray.py b/xarray/core/dataarray.py index 82be3989b27..b61f83bcb1c 100644 --- a/xarray/core/dataarray.py +++ b/xarray/core/dataarray.py @@ -752,6 +752,9 @@ def reset_coords( dataset[self.name] = self.variable return dataset + def __dask_tokenize__(self): + return (type(self), self._variable, self._coords, self._name) + def __dask_graph__(self): return self._to_temp_dataset().__dask_graph__() diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 6e94d35df40..2b89051e84e 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -651,6 +651,9 @@ def load(self, **kwargs) -> "Dataset": return self + def __dask_tokenize__(self): + return (type(self), self._variables, self._coord_names, self._attrs) + def __dask_graph__(self): graphs = {k: v.__dask_graph__() for k, v in self.variables.items()} graphs = {k: v for k, v in graphs.items() if v is not None} diff --git a/xarray/core/variable.py b/xarray/core/variable.py index b7abdc7c462..117ab85ae65 100644 --- a/xarray/core/variable.py +++ b/xarray/core/variable.py @@ -390,6 +390,11 @@ def compute(self, **kwargs): new = self.copy(deep=False) return new.load(**kwargs) + def __dask_tokenize__(self): + # Use v.data, instead of v._data, in order to cope with the wrappers + # around NetCDF and the like + return type(self), self._dims, self.data, self._attrs + def __dask_graph__(self): if isinstance(self._data, dask_array_type): return self._data.__dask_graph__() @@ -1963,6 +1968,10 @@ def __init__(self, dims, data, attrs=None, encoding=None, fastpath=False): if not isinstance(self._data, PandasIndexAdapter): self._data = PandasIndexAdapter(self._data) + def __dask_tokenize__(self): + # Don't waste time converting pd.Index to np.ndarray + return (type(self), self._dims, self._data.array, self._attrs) + def load(self): # data is already loaded into memory for IndexVariable return self diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 50517ae3c9c..c4323d1d317 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -1,5 +1,6 @@ import operator import pickle +import sys from contextlib import suppress from distutils.version import LooseVersion from textwrap import dedent @@ -21,12 +22,16 @@ assert_frame_equal, assert_identical, raises_regex, + requires_scipy_or_netCDF4, ) +from .test_backends import create_tmp_file dask = pytest.importorskip("dask") da = pytest.importorskip("dask.array") dd = pytest.importorskip("dask.dataframe") +ON_WINDOWS = sys.platform == "win32" + class CountingScheduler: """ Simple dask scheduler counting the number of computes. @@ -1135,3 +1140,92 @@ def test_make_meta(map_ds): for variable in map_ds.data_vars: assert variable in meta.data_vars assert meta.data_vars[variable].shape == (0,) * meta.data_vars[variable].ndim + + +@pytest.mark.parametrize( + "obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()] +) +@pytest.mark.parametrize( + "transform", + [ + lambda x: x.reset_coords(), + lambda x: x.reset_coords(drop=True), + lambda x: x.isel(x=1), + lambda x: x.attrs.update(new_attrs=1), + lambda x: x.assign_coords(cxy=1), + lambda x: x.rename({"x": "xnew"}), + lambda x: x.rename({"cxy": "cxynew"}), + ], +) +def test_token_changes_on_transform(obj, transform): + with raise_if_dask_computes(): + assert dask.base.tokenize(obj) != dask.base.tokenize(transform(obj)) + + +@pytest.mark.parametrize( + "obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()] +) +def test_token_changes_when_data_changes(obj): + with raise_if_dask_computes(): + t1 = dask.base.tokenize(obj) + + # Change data_var + if isinstance(obj, DataArray): + obj *= 2 + else: + obj["a"] *= 2 + with raise_if_dask_computes(): + t2 = dask.base.tokenize(obj) + assert t2 != t1 + + # Change non-index coord + obj.coords["ndcoord"] *= 2 + with raise_if_dask_computes(): + t3 = dask.base.tokenize(obj) + assert t3 != t2 + + # Change IndexVariable + obj.coords["x"] *= 2 + with raise_if_dask_computes(): + t4 = dask.base.tokenize(obj) + assert t4 != t3 + + +@pytest.mark.parametrize("obj", [make_da().compute(), make_ds().compute()]) +def test_token_changes_when_buffer_changes(obj): + with raise_if_dask_computes(): + t1 = dask.base.tokenize(obj) + + if isinstance(obj, DataArray): + obj[0, 0] = 123 + else: + obj["a"][0, 0] = 123 + with raise_if_dask_computes(): + t2 = dask.base.tokenize(obj) + assert t2 != t1 + + obj.coords["ndcoord"][0] = 123 + with raise_if_dask_computes(): + t3 = dask.base.tokenize(obj) + assert t3 != t2 + + +@pytest.mark.parametrize( + "transform", + [lambda x: x, lambda x: x.copy(deep=False), lambda x: x.copy(deep=True)], +) +@pytest.mark.parametrize("obj", [make_da(), make_ds(), make_ds().variables["a"]]) +def test_token_identical(obj, transform): + with raise_if_dask_computes(): + assert dask.base.tokenize(obj) == dask.base.tokenize(transform(obj)) + assert dask.base.tokenize(obj.compute()) == dask.base.tokenize( + transform(obj.compute()) + ) + + +@requires_scipy_or_netCDF4 +def test_normalize_token_with_backend(map_ds): + with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as tmp_file: + map_ds.to_netcdf(tmp_file) + read = xr.open_dataset(tmp_file) + assert not dask.base.tokenize(map_ds) == dask.base.tokenize(read) diff --git a/xarray/tests/test_sparse.py b/xarray/tests/test_sparse.py index 73c4b9b8c74..8e2d4b8e064 100644 --- a/xarray/tests/test_sparse.py +++ b/xarray/tests/test_sparse.py @@ -11,7 +11,7 @@ from xarray.core.npcompat import IS_NEP18_ACTIVE from xarray.core.pycompat import sparse_array_type -from . import assert_equal, assert_identical +from . import assert_equal, assert_identical, requires_dask param = pytest.param xfail = pytest.mark.xfail @@ -849,3 +849,23 @@ def test_chunk(): dsc = ds.chunk(2) assert dsc.chunks == {"dim_0": (2, 2)} assert_identical(dsc, ds) + + +@requires_dask +def test_dask_token(): + import dask + + s = sparse.COO.from_numpy(np.array([0, 0, 1, 2])) + a = DataArray(s) + t1 = dask.base.tokenize(a) + t2 = dask.base.tokenize(a) + t3 = dask.base.tokenize(a + 1) + assert t1 == t2 + assert t3 != t2 + assert isinstance(a.data, sparse.COO) + + ac = a.chunk(2) + t4 = dask.base.tokenize(ac) + t5 = dask.base.tokenize(ac + 1) + assert t4 != t5 + assert isinstance(ac.data._meta, sparse.COO)