From ed0bf082af3afedf90b2da6ac1c0a47fbb110bb1 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 28 Oct 2017 10:30:42 -0700 Subject: [PATCH 1/5] Tweak to to_dask_dataframe() - Add a `dim_order` argument - Always write columns for each dimension - Docstring to NumPy format --- xarray/core/dataset.py | 95 ++++++++++++++++++++++----------------- xarray/tests/test_dask.py | 45 +++++++++++++++++-- 2 files changed, 95 insertions(+), 45 deletions(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 0f9aa4c8229..a6a9060845a 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2424,7 +2424,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs): ------- applied : Dataset Resulting dataset from applying ``func`` over each data variable. - + Examples -------- >>> da = xr.DataArray(np.random.randn(2, 3)) @@ -2442,7 +2442,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs): Dimensions without coordinates: dim_0, dim_1, x Data variables: foo (dim_0, dim_1) float64 0.3751 1.951 1.945 0.2948 0.711 0.3948 - bar (x) float64 1.0 2.0 + bar (x) float64 1.0 2.0 """ variables = OrderedDict( (k, maybe_wrap_array(v, func(v, *args, **kwargs))) @@ -2577,63 +2577,76 @@ def from_dataframe(cls, dataframe): obj[name] = (dims, data) return obj - def to_dask_dataframe(self, set_index=False): + def to_dask_dataframe(self, dim_order=None, set_index=False): """ Convert this dataset into a dask.dataframe.DataFrame. - Both the coordinate and data variables in this dataset form + The dimensions, coordinates and data variables in this dataset form the columns of the DataFrame. - If set_index=True, the dask DataFrame is indexed by this dataset's - coordinate. Since dask DataFrames to not support multi-indexes, - set_index only works if there is one coordinate dimension. + Arguments + --------- + dim_order : list, optional + Hierarchical dimension order for the resulting dataframe. All + arrays are transposed to this order and then written out as flat + vectors in contiguous order, so the last dimension in this list + will be contiguous in the resulting DataFrame. This has a major + influence on which operations are efficient on the resulting dask + dataframe. + + If provided, must include all dimensions on this dataset. By + default, dimensions are sorted alphabetically. + set_index : bool, optional + If set_index=True, the dask DataFrame is indexed by this dataset's + coordinate. Since dask DataFrames to not support multi-indexes, + set_index only works if the dataset only contains one dimension. + + Returns + ------- + dask.dataframe.DataFrame """ import dask.dataframe as dd - ordered_dims = self.dims - chunks = self.chunks - - # order columns so that coordinates appear before data - columns = list(self.coords) + list(self.data_vars) - - data = [] - for k in columns: - v = self._variables[k] - - # consider coordinate variables as well as data varibles - if isinstance(v, xr.IndexVariable): - v = v.to_base_variable() + if dim_order is None: + dim_order = list(self.dims) + elif set(dim_order) != set(self.dims): + raise ValueError( + 'dim_order {} does not match the set of dimensions on this ' + 'Dataset: {}'.format(dim_order, list(self.dims))) - # ensure all variables span the same dimensions - v = v.set_dims(ordered_dims) + ordered_dims = OrderedDict((k, self.dims[k]) for k in dim_order) - # ensure all variables have the same chunking structure - if v.chunks != chunks: - v = v.chunk(chunks) + columns = list(ordered_dims) + columns.extend(k for k in self.coords if k not in self.dims) + columns.extend(self.data_vars) - # reshape variable contents as a 1d array - d = v.data.reshape(-1) + data = [] + for name in columns: + try: + var = self.variables[name] + except KeyError: + # dimension without a matching coordinate + var = Variable((name,), np.arange(self.dims[name])) - # convert to dask DataFrames - s = dd.from_array(d, columns=[k]) + # IndexVariable objects have a dummy .chunk() method + if isinstance(var, IndexVariable): + var = var.to_base_variable() - data.append(s) + dask_array = var.set_dims(ordered_dims).chunk(self.chunks).data + series = dd.from_array(dask_array.reshape(-1), columns=[name]) + data.append(series) df = dd.concat(data, axis=1) if set_index: - - if len(ordered_dims) != 1: - raise ValueError( - 'set_index=True only is valid for ' - 'for one-dimensional datasets') - - # extract out first (and only) coordinate variable - coord_dim = list(ordered_dims)[0] - - if coord_dim in df.columns: - df = df.set_index(coord_dim) + if len(dim_order) == 1: + (dim,) = dim_order + df = df.set_index(dim) + else: + # triggers an error about multi-indexes, even if only one + # dimension is passed + df = df.set_index(dim_order) return df diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index 98b0da26ae4..ca9188b6e14 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -14,7 +14,7 @@ from xarray import Variable, DataArray, Dataset import xarray.ufuncs as xu from xarray.core.pycompat import suppress, OrderedDict -from . import TestCase, assert_frame_equal +from . import TestCase, assert_frame_equal, raises_regex from xarray.tests import mock @@ -547,6 +547,9 @@ def test_from_dask_variable(self): coords={'x': range(4)}, name='foo') self.assertLazyAndIdentical(self.lazy_array, a) + +class TestToDaskDataFrame(TestCase): + def test_to_dask_dataframe(self): # Test conversion of Datasets to dask DataFrames x = da.from_array(np.random.randn(10), chunks=4) @@ -595,12 +598,24 @@ def test_to_dask_dataframe_2D(self): index=exp_index) # so for now, reset the index expected = expected.reset_index(drop=False) - actual = ds.to_dask_dataframe(set_index=False) self.assertIsInstance(actual, dd.DataFrame) assert_frame_equal(expected, actual.compute()) + @pytest.mark.xfail(raises=NotImplementedError) + def test_to_dask_dataframe_2D_set_index(self): + # This will fail until dask implements MultiIndex support + w = da.from_array(np.random.randn(2, 3), chunks=(1, 2)) + ds = Dataset({'w': (('x', 'y'), w)}) + ds['x'] = ('x', np.array([0, 1], np.int64)) + ds['y'] = ('y', list('abc')) + + expected = ds.compute().to_dataframe() + actual = ds.to_dask_dataframe(set_index=True) + self.assertIsInstance(actual, dd.DataFrame) + assert_frame_equal(expected, actual.compute()) + def test_to_dask_dataframe_coordinates(self): # Test if coordinate is also a dask array x = da.from_array(np.random.randn(10), chunks=4) @@ -634,13 +649,35 @@ def test_to_dask_dataframe_not_daskarray(self): assert_frame_equal(expected, actual.compute()) def test_to_dask_dataframe_no_coordinate(self): - # Test if Dataset has a dimension without coordinates x = da.from_array(np.random.randn(10), chunks=4) ds = Dataset({'x': ('dim_0', x)}) - expected = pd.DataFrame({'x': x.compute()}) + + expected = ds.compute().to_dataframe().reset_index() + actual = ds.to_dask_dataframe() + self.assertIsInstance(actual, dd.DataFrame) + assert_frame_equal(expected, actual.compute()) + + expected = ds.compute().to_dataframe() actual = ds.to_dask_dataframe(set_index=True) + self.assertIsInstance(actual, dd.DataFrame) + assert_frame_equal(expected, actual.compute()) + + def test_to_dask_dataframe_dim_order(self): + ds = Dataset({'w': (('x', 'y'), [[1, 2], [3, 4]])}).chunk(1) + + expected = ds['w'].to_series().reset_index() + actual = ds.to_dask_dataframe(dim_order=['x', 'y']) + self.assertIsInstance(actual, dd.DataFrame) + assert_frame_equal(expected, actual.compute()) + + expected = ds['w'].T.to_series().reset_index() + actual = ds.to_dask_dataframe(dim_order=['y', 'x']) + self.assertIsInstance(actual, dd.DataFrame) assert_frame_equal(expected, actual.compute()) + with raises_regex(ValueError, 'does not match the set of dimensions'): + ds.to_dask_dataframe(dim_order=['x']) + @pytest.mark.parametrize("method", ['load', 'compute']) def test_dask_kwargs_variable(method): From 1b723fbdd565dfffabd0e82fec5948ab8e380217 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 28 Oct 2017 13:35:46 -0700 Subject: [PATCH 2/5] Fix windows test failure --- xarray/tests/test_dask.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xarray/tests/test_dask.py b/xarray/tests/test_dask.py index ca9188b6e14..d1330cfcffe 100644 --- a/xarray/tests/test_dask.py +++ b/xarray/tests/test_dask.py @@ -663,7 +663,8 @@ def test_to_dask_dataframe_no_coordinate(self): assert_frame_equal(expected, actual.compute()) def test_to_dask_dataframe_dim_order(self): - ds = Dataset({'w': (('x', 'y'), [[1, 2], [3, 4]])}).chunk(1) + values = np.array([[1, 2], [3, 4]], dtype=np.int64) + ds = Dataset({'w': (('x', 'y'), values)}).chunk(1) expected = ds['w'].to_series().reset_index() actual = ds.to_dask_dataframe(dim_order=['x', 'y']) From 8ee4023bff7609b6fb0ab026af9e68f266331e02 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sat, 28 Oct 2017 13:57:29 -0700 Subject: [PATCH 3/5] More windows failure --- xarray/core/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index a6a9060845a..5df04772f5a 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2627,7 +2627,7 @@ def to_dask_dataframe(self, dim_order=None, set_index=False): var = self.variables[name] except KeyError: # dimension without a matching coordinate - var = Variable((name,), np.arange(self.dims[name])) + var = Variable((name,), np.arange(self.dims[name], np.int64)) # IndexVariable objects have a dummy .chunk() method if isinstance(var, IndexVariable): From 922cff57628bb0db7c3d46bb1051c5cccc0dffb4 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Sun, 29 Oct 2017 11:55:52 -0700 Subject: [PATCH 4/5] Fix failing test --- xarray/core/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 5df04772f5a..203c16bc72a 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2627,7 +2627,8 @@ def to_dask_dataframe(self, dim_order=None, set_index=False): var = self.variables[name] except KeyError: # dimension without a matching coordinate - var = Variable((name,), np.arange(self.dims[name], np.int64)) + values = np.arange(self.dims[name], dtype=np.int64) + var = Variable((name,), values) # IndexVariable objects have a dummy .chunk() method if isinstance(var, IndexVariable): From c4f166b865c875dbbb392e6b4ec21d0c41c8bf05 Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Mon, 30 Oct 2017 15:38:24 -0700 Subject: [PATCH 5/5] Use da.arange() inside to_dask_dataframe --- xarray/core/dataset.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/xarray/core/dataset.py b/xarray/core/dataset.py index 203c16bc72a..422e9fe38fa 100644 --- a/xarray/core/dataset.py +++ b/xarray/core/dataset.py @@ -2606,6 +2606,7 @@ def to_dask_dataframe(self, dim_order=None, set_index=False): dask.dataframe.DataFrame """ + import dask.array as da import dask.dataframe as dd if dim_order is None: @@ -2621,14 +2622,15 @@ def to_dask_dataframe(self, dim_order=None, set_index=False): columns.extend(k for k in self.coords if k not in self.dims) columns.extend(self.data_vars) - data = [] + series_list = [] for name in columns: try: var = self.variables[name] except KeyError: # dimension without a matching coordinate - values = np.arange(self.dims[name], dtype=np.int64) - var = Variable((name,), values) + size = self.dims[name] + data = da.arange(size, chunks=size, dtype=np.int64) + var = Variable((name,), data) # IndexVariable objects have a dummy .chunk() method if isinstance(var, IndexVariable): @@ -2636,9 +2638,9 @@ def to_dask_dataframe(self, dim_order=None, set_index=False): dask_array = var.set_dims(ordered_dims).chunk(self.chunks).data series = dd.from_array(dask_array.reshape(-1), columns=[name]) - data.append(series) + series_list.append(series) - df = dd.concat(data, axis=1) + df = dd.concat(series_list, axis=1) if set_index: if len(dim_order) == 1: