Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Explode Series with Dask-cuDF #8872

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -6034,6 +6034,18 @@ def explode(self, ignore_index=False):
3 5
dtype: int64
"""
if is_struct_dtype(self._column.dtype):
cols = [key for key in self.dtype.fields]
results = []
for row in self.to_arrow():
row_results = [str(row[col]) for col in cols]
results.append(row_results)

out = cudf.DataFrame(results, columns=cols)
Comment on lines +6040 to +6044
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the structure of the out you want here? The device to host transfer + nested loop may be quite slow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was that it would essentially look the same as the struct.explode() functionality that's already been implemented, so like:
image

Copy link
Member

@beckernick beckernick Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be able to accomplish that without going to the CPU:

import cudfs = cudf.Series([
    {"a":5, "b":10},
    {"a":3, "b":7},
    {"a":-3, "b":11},
])
​
results = []
for key in s.dtype.fields:
    results.append(s.struct.field(key))
​
out = cudf.concat(results, axis=1)
out.columns = s.dtype.fields
print(out)
   a   b
0  5  10
1  3   7
2 -3   11

Separately, we may want to think more broadly about what we want the behavior to be for functionality that "kind of" exists in pandas. Pandas doesn't have struct columns, but does allow exploding of an object column containing dictionaries. However, the explode does not behave like exploding a struct column in Hive, Spark, etc. Instead, it behaves like exploding a list column (which it doesn't technically have either), where every element becomes a new row in a single column. This is a traditional, rather than a lateral, explode.

Because of that, we might need to special case an explode operator in dask-cuDF anyway, rather than rely on Dask to appropriately delegate to the cuDF explode from the existing one in Dask.DataFrame.

For now, I'd suggest we consider holding off on series.explode() natively doing a "lateral explode" for struct columns, and instead building the lateral view explode functionality as dask_series.struct.explode() once we land #8658

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @shwina @VibhuJawa (as they might disagree)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I don't think we want the Series.explode() to support this just yet (unless Pandas does so). dask_series.struct.explode() can work off of series.struct.explode().

Copy link
Contributor Author

@sarahyurick sarahyurick Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in Pandas, you just get something like:

import pandas as pd

s = pd.Series([{"a": 1, "b": "x"},
                {"a": 2, "b": "y"},
                {"a": 3, "b": "z"},
                {"a": 4, "b": "a"}])
s.explode()
0    a
0    b
1    a
1    b
2    a
2    b
3    a
3    b
dtype: object

Copy link
Member

@beckernick beckernick Jul 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example 👍 .

This is the "traditional explode" mentioned above and what we do with lists. The "lateral explode" is particularly common for structs, as the field names often map to actual features.

We may want to support this kind of traditional explode for structs, but in general this would be less common

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good - I'll close this PR then, as it has a different functionality than the desired dask_series.struct.explode()

for col in cols:
out[col] = out[col].astype(self.dtype.fields[col])
return out

if not is_list_dtype(self._column.dtype):
data = self._data.copy(deep=True)
idx = None if ignore_index else self._index.copy(deep=True)
Expand Down
24 changes: 24 additions & 0 deletions python/cudf/cudf/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import pandas as pd
import pytest

import dask_cudf

import cudf
from cudf.testing._utils import (
DATETIME_TYPES,
Expand Down Expand Up @@ -1230,3 +1232,25 @@ def test_explode(data, ignore_index, p_index):
def test_nested_series_from_sequence_data(data, expected):
actual = cudf.Series(data)
assert_eq(actual, expected)


@pytest.mark.parametrize(
"data, npartitions",
[
(
[
{"a": 1, "b": "x"},
{"a": 2, "b": "y"},
{"a": 3, "b": "z"},
{"a": 4, "b": "a"},
],
2,
)
],
)
def test_dask_explode(data, npartitions):
s = cudf.Series(data)
assert_eq(s.struct.explode(), s.explode())

sd = dask_cudf.from_cudf(s, npartitions=npartitions)
assert_eq(s.explode(), sd.compute().explode())