Skip to content

Commit

Permalink
Tweak to to_dask_dataframe() (#1667)
Browse files Browse the repository at this point in the history
* Tweak to to_dask_dataframe()

- Add a `dim_order` argument
- Always write columns for each dimension
- Docstring to NumPy format

* Fix windows test failure

* More windows failure

* Fix failing test

* Use da.arange() inside to_dask_dataframe
  • Loading branch information
shoyer authored and Joe Hamman committed Oct 31, 2017
1 parent 20f9ffd commit 7e9193c
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 45 deletions.
100 changes: 58 additions & 42 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs):
-------
applied : Dataset
Resulting dataset from applying ``func`` over each data variable.
Examples
--------
>>> da = xr.DataArray(np.random.randn(2, 3))
Expand All @@ -2442,7 +2442,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs):
Dimensions without coordinates: dim_0, dim_1, x
Data variables:
foo (dim_0, dim_1) float64 0.3751 1.951 1.945 0.2948 0.711 0.3948
bar (x) float64 1.0 2.0
bar (x) float64 1.0 2.0
"""
variables = OrderedDict(
(k, maybe_wrap_array(v, func(v, *args, **kwargs)))
Expand Down Expand Up @@ -2577,63 +2577,79 @@ def from_dataframe(cls, dataframe):
obj[name] = (dims, data)
return obj

def to_dask_dataframe(self, set_index=False):
def to_dask_dataframe(self, dim_order=None, set_index=False):
"""
Convert this dataset into a dask.dataframe.DataFrame.
Both the coordinate and data variables in this dataset form
The dimensions, coordinates and data variables in this dataset form
the columns of the DataFrame.
If set_index=True, the dask DataFrame is indexed by this dataset's
coordinate. Since dask DataFrames to not support multi-indexes,
set_index only works if there is one coordinate dimension.
Arguments
---------
dim_order : list, optional
Hierarchical dimension order for the resulting dataframe. All
arrays are transposed to this order and then written out as flat
vectors in contiguous order, so the last dimension in this list
will be contiguous in the resulting DataFrame. This has a major
influence on which operations are efficient on the resulting dask
dataframe.
If provided, must include all dimensions on this dataset. By
default, dimensions are sorted alphabetically.
set_index : bool, optional
If set_index=True, the dask DataFrame is indexed by this dataset's
coordinate. Since dask DataFrames to not support multi-indexes,
set_index only works if the dataset only contains one dimension.
Returns
-------
dask.dataframe.DataFrame
"""

import dask.array as da
import dask.dataframe as dd

ordered_dims = self.dims
chunks = self.chunks

# order columns so that coordinates appear before data
columns = list(self.coords) + list(self.data_vars)

data = []
for k in columns:
v = self._variables[k]

# consider coordinate variables as well as data varibles
if isinstance(v, xr.IndexVariable):
v = v.to_base_variable()
if dim_order is None:
dim_order = list(self.dims)
elif set(dim_order) != set(self.dims):
raise ValueError(
'dim_order {} does not match the set of dimensions on this '
'Dataset: {}'.format(dim_order, list(self.dims)))

# ensure all variables span the same dimensions
v = v.set_dims(ordered_dims)
ordered_dims = OrderedDict((k, self.dims[k]) for k in dim_order)

# ensure all variables have the same chunking structure
if v.chunks != chunks:
v = v.chunk(chunks)
columns = list(ordered_dims)
columns.extend(k for k in self.coords if k not in self.dims)
columns.extend(self.data_vars)

# reshape variable contents as a 1d array
d = v.data.reshape(-1)
series_list = []
for name in columns:
try:
var = self.variables[name]
except KeyError:
# dimension without a matching coordinate
size = self.dims[name]
data = da.arange(size, chunks=size, dtype=np.int64)
var = Variable((name,), data)

# convert to dask DataFrames
s = dd.from_array(d, columns=[k])
# IndexVariable objects have a dummy .chunk() method
if isinstance(var, IndexVariable):
var = var.to_base_variable()

data.append(s)
dask_array = var.set_dims(ordered_dims).chunk(self.chunks).data
series = dd.from_array(dask_array.reshape(-1), columns=[name])
series_list.append(series)

df = dd.concat(data, axis=1)
df = dd.concat(series_list, axis=1)

if set_index:

if len(ordered_dims) != 1:
raise ValueError(
'set_index=True only is valid for '
'for one-dimensional datasets')

# extract out first (and only) coordinate variable
coord_dim = list(ordered_dims)[0]

if coord_dim in df.columns:
df = df.set_index(coord_dim)
if len(dim_order) == 1:
(dim,) = dim_order
df = df.set_index(dim)
else:
# triggers an error about multi-indexes, even if only one
# dimension is passed
df = df.set_index(dim_order)

return df

Expand Down
44 changes: 41 additions & 3 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,9 @@ def test_from_dask_variable(self):
coords={'x': range(4)}, name='foo')
self.assertLazyAndIdentical(self.lazy_array, a)


class TestToDaskDataFrame(TestCase):

def test_to_dask_dataframe(self):
# Test conversion of Datasets to dask DataFrames
x = da.from_array(np.random.randn(10), chunks=4)
Expand Down Expand Up @@ -595,12 +598,24 @@ def test_to_dask_dataframe_2D(self):
index=exp_index)
# so for now, reset the index
expected = expected.reset_index(drop=False)

actual = ds.to_dask_dataframe(set_index=False)

self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

@pytest.mark.xfail(raises=NotImplementedError)
def test_to_dask_dataframe_2D_set_index(self):
# This will fail until dask implements MultiIndex support
w = da.from_array(np.random.randn(2, 3), chunks=(1, 2))
ds = Dataset({'w': (('x', 'y'), w)})
ds['x'] = ('x', np.array([0, 1], np.int64))
ds['y'] = ('y', list('abc'))

expected = ds.compute().to_dataframe()
actual = ds.to_dask_dataframe(set_index=True)
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_coordinates(self):
# Test if coordinate is also a dask array
x = da.from_array(np.random.randn(10), chunks=4)
Expand Down Expand Up @@ -634,13 +649,36 @@ def test_to_dask_dataframe_not_daskarray(self):
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_no_coordinate(self):
# Test if Dataset has a dimension without coordinates
x = da.from_array(np.random.randn(10), chunks=4)
ds = Dataset({'x': ('dim_0', x)})
expected = pd.DataFrame({'x': x.compute()})

expected = ds.compute().to_dataframe().reset_index()
actual = ds.to_dask_dataframe()
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

expected = ds.compute().to_dataframe()
actual = ds.to_dask_dataframe(set_index=True)
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_dim_order(self):
values = np.array([[1, 2], [3, 4]], dtype=np.int64)
ds = Dataset({'w': (('x', 'y'), values)}).chunk(1)

expected = ds['w'].to_series().reset_index()
actual = ds.to_dask_dataframe(dim_order=['x', 'y'])
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

expected = ds['w'].T.to_series().reset_index()
actual = ds.to_dask_dataframe(dim_order=['y', 'x'])
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

with raises_regex(ValueError, 'does not match the set of dimensions'):
ds.to_dask_dataframe(dim_order=['x'])


@pytest.mark.parametrize("method", ['load', 'compute'])
def test_dask_kwargs_variable(method):
Expand Down

0 comments on commit 7e9193c

Please sign in to comment.