Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak to to_dask_dataframe() #1667

Merged
merged 6 commits into from
Oct 31, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 55 additions & 41 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2424,7 +2424,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs):
-------
applied : Dataset
Resulting dataset from applying ``func`` over each data variable.

Examples
--------
>>> da = xr.DataArray(np.random.randn(2, 3))
Expand All @@ -2442,7 +2442,7 @@ def apply(self, func, keep_attrs=False, args=(), **kwargs):
Dimensions without coordinates: dim_0, dim_1, x
Data variables:
foo (dim_0, dim_1) float64 0.3751 1.951 1.945 0.2948 0.711 0.3948
bar (x) float64 1.0 2.0
bar (x) float64 1.0 2.0
"""
variables = OrderedDict(
(k, maybe_wrap_array(v, func(v, *args, **kwargs)))
Expand Down Expand Up @@ -2577,63 +2577,77 @@ def from_dataframe(cls, dataframe):
obj[name] = (dims, data)
return obj

def to_dask_dataframe(self, set_index=False):
def to_dask_dataframe(self, dim_order=None, set_index=False):
"""
Convert this dataset into a dask.dataframe.DataFrame.

Both the coordinate and data variables in this dataset form
The dimensions, coordinates and data variables in this dataset form
the columns of the DataFrame.

If set_index=True, the dask DataFrame is indexed by this dataset's
coordinate. Since dask DataFrames to not support multi-indexes,
set_index only works if there is one coordinate dimension.
Arguments
---------
dim_order : list, optional
Hierarchical dimension order for the resulting dataframe. All
arrays are transposed to this order and then written out as flat
vectors in contiguous order, so the last dimension in this list
will be contiguous in the resulting DataFrame. This has a major
influence on which operations are efficient on the resulting dask
dataframe.

If provided, must include all dimensions on this dataset. By
default, dimensions are sorted alphabetically.
set_index : bool, optional
If set_index=True, the dask DataFrame is indexed by this dataset's
coordinate. Since dask DataFrames to not support multi-indexes,
set_index only works if the dataset only contains one dimension.

Returns
-------
dask.dataframe.DataFrame
"""

import dask.dataframe as dd

ordered_dims = self.dims
chunks = self.chunks

# order columns so that coordinates appear before data
columns = list(self.coords) + list(self.data_vars)

data = []
for k in columns:
v = self._variables[k]

# consider coordinate variables as well as data varibles
if isinstance(v, xr.IndexVariable):
v = v.to_base_variable()
if dim_order is None:
dim_order = list(self.dims)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't seem to remember but is this always a sorted tuple/dict?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Dataset, it's a SortedKeysDict (i.e., the dimensions in alphabetical order).

elif set(dim_order) != set(self.dims):
raise ValueError(
'dim_order {} does not match the set of dimensions on this '
'Dataset: {}'.format(dim_order, list(self.dims)))

# ensure all variables span the same dimensions
v = v.set_dims(ordered_dims)
ordered_dims = OrderedDict((k, self.dims[k]) for k in dim_order)

# ensure all variables have the same chunking structure
if v.chunks != chunks:
v = v.chunk(chunks)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmunroe was there was reason why you didn't just chunk everything here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the assumption (probably mistaken) that there was a cost to calling .chunk(chunks) on a variable that already had that chunking structure. If that assumption was not correct, then, yes, everything could just be chunked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rechunk is nearly free if chunks are unchanged -- it actually returns the same dask array object.

columns = list(ordered_dims)
columns.extend(k for k in self.coords if k not in self.dims)
columns.extend(self.data_vars)

# reshape variable contents as a 1d array
d = v.data.reshape(-1)
data = []
for name in columns:
try:
var = self.variables[name]
except KeyError:
# dimension without a matching coordinate
values = np.arange(self.dims[name], dtype=np.int64)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we initialize this as a dask array to avoid creating the array when it will not be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea, will do

var = Variable((name,), values)

# convert to dask DataFrames
s = dd.from_array(d, columns=[k])
# IndexVariable objects have a dummy .chunk() method
if isinstance(var, IndexVariable):
var = var.to_base_variable()

data.append(s)
dask_array = var.set_dims(ordered_dims).chunk(self.chunks).data
series = dd.from_array(dask_array.reshape(-1), columns=[name])
data.append(series)

df = dd.concat(data, axis=1)

if set_index:

if len(ordered_dims) != 1:
raise ValueError(
'set_index=True only is valid for '
'for one-dimensional datasets')

# extract out first (and only) coordinate variable
coord_dim = list(ordered_dims)[0]

if coord_dim in df.columns:
df = df.set_index(coord_dim)
if len(dim_order) == 1:
(dim,) = dim_order
df = df.set_index(dim)
else:
# triggers an error about multi-indexes, even if only one
# dimension is passed
df = df.set_index(dim_order)

return df

Expand Down
46 changes: 42 additions & 4 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from xarray import Variable, DataArray, Dataset
import xarray.ufuncs as xu
from xarray.core.pycompat import suppress, OrderedDict
from . import TestCase, assert_frame_equal
from . import TestCase, assert_frame_equal, raises_regex

from xarray.tests import mock

Expand Down Expand Up @@ -547,6 +547,9 @@ def test_from_dask_variable(self):
coords={'x': range(4)}, name='foo')
self.assertLazyAndIdentical(self.lazy_array, a)


class TestToDaskDataFrame(TestCase):

def test_to_dask_dataframe(self):
# Test conversion of Datasets to dask DataFrames
x = da.from_array(np.random.randn(10), chunks=4)
Expand Down Expand Up @@ -595,12 +598,24 @@ def test_to_dask_dataframe_2D(self):
index=exp_index)
# so for now, reset the index
expected = expected.reset_index(drop=False)

actual = ds.to_dask_dataframe(set_index=False)

self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

@pytest.mark.xfail(raises=NotImplementedError)
def test_to_dask_dataframe_2D_set_index(self):
# This will fail until dask implements MultiIndex support
w = da.from_array(np.random.randn(2, 3), chunks=(1, 2))
ds = Dataset({'w': (('x', 'y'), w)})
ds['x'] = ('x', np.array([0, 1], np.int64))
ds['y'] = ('y', list('abc'))

expected = ds.compute().to_dataframe()
actual = ds.to_dask_dataframe(set_index=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than using xfail above, use raises_regex to make sure we raise an error in the correct line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning on using xfail was that that makes this test more robust. If/when dask implementing MultiIndex, we'll just get an unexpected xfail rather than a failing test. NotImplementedError seems specific enough (unlike, e.g., ValueError) that I'm not concerned about grepping for the exact error message.

self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_coordinates(self):
# Test if coordinate is also a dask array
x = da.from_array(np.random.randn(10), chunks=4)
Expand Down Expand Up @@ -634,13 +649,36 @@ def test_to_dask_dataframe_not_daskarray(self):
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_no_coordinate(self):
# Test if Dataset has a dimension without coordinates
x = da.from_array(np.random.randn(10), chunks=4)
ds = Dataset({'x': ('dim_0', x)})
expected = pd.DataFrame({'x': x.compute()})

expected = ds.compute().to_dataframe().reset_index()
actual = ds.to_dask_dataframe()
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

expected = ds.compute().to_dataframe()
actual = ds.to_dask_dataframe(set_index=True)
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

def test_to_dask_dataframe_dim_order(self):
values = np.array([[1, 2], [3, 4]], dtype=np.int64)
ds = Dataset({'w': (('x', 'y'), values)}).chunk(1)

expected = ds['w'].to_series().reset_index()
actual = ds.to_dask_dataframe(dim_order=['x', 'y'])
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

expected = ds['w'].T.to_series().reset_index()
actual = ds.to_dask_dataframe(dim_order=['y', 'x'])
self.assertIsInstance(actual, dd.DataFrame)
assert_frame_equal(expected, actual.compute())

with raises_regex(ValueError, 'does not match the set of dimensions'):
ds.to_dask_dataframe(dim_order=['x'])


@pytest.mark.parametrize("method", ['load', 'compute'])
def test_dask_kwargs_variable(method):
Expand Down