Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure all dask-cudf supported aggs are handled in _tree_node_agg #9487

Merged
merged 6 commits into from
Nov 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 73 additions & 42 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@

import cudf

SUPPORTED_AGGS = (
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
"first",
"last",
)
rjzamora marked this conversation as resolved.
Show resolved Hide resolved


class CudfDataFrameGroupBy(DataFrameGroupBy):
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -60,23 +73,24 @@ def mean(self, split_every=None, split_out=1):
as_index=self.as_index,
)

def collect(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.index,
{c: "collect" for c in self.obj.columns if c not in self.index},
split_every=split_every,
split_out=split_out,
dropna=self.dropna,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
)

def aggregate(self, arg, split_every=None, split_out=1):
if arg == "size":
return self.size()
arg = _redirect_aggs(arg)

_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
"first",
"last",
}
if (
isinstance(self.obj, DaskDataFrame)
and (
Expand All @@ -86,7 +100,7 @@ def aggregate(self, arg, split_every=None, split_out=1):
and all(isinstance(x, str) for x in self.index)
)
)
and _is_supported(arg, _supported)
and _is_supported(arg, SUPPORTED_AGGS)
):
if isinstance(self._meta.grouping.keys, cudf.MultiIndex):
keys = self._meta.grouping.keys.names
Expand Down Expand Up @@ -129,33 +143,62 @@ def mean(self, split_every=None, split_out=1):
as_index=self.as_index,
)[self._slice]

def std(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.index,
{self._slice: "std"},
split_every=split_every,
split_out=split_out,
dropna=self.dropna,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
)[self._slice]

def var(self, split_every=None, split_out=1):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added in overrides for var and std because it seems like trying to use upstream Dask's implementations here fails:

python/dask_cudf/dask_cudf/tests/test_groupby.py ............F
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

aggregation = 'std'

    @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
    def test_groupby_basic_series(aggregation):
        pdf = pd.DataFrame(
            {
                "x": np.random.randint(0, 5, size=10000),
                "y": np.random.normal(size=10000),
            }
        )
    
        gdf = cudf.DataFrame.from_pandas(pdf)
    
        ddf = dask_cudf.from_cudf(gdf, npartitions=5)
    
        a = getattr(gdf.groupby("x").x, aggregation)()
>       b = getattr(ddf.groupby("x").x, aggregation)().compute()

python/dask_cudf/dask_cudf/tests/test_groupby.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dask/dask/dataframe/groupby.py:1445: in std
    v = self.var(ddof, split_every=split_every, split_out=split_out)
../dask/dask/dataframe/groupby.py:1439: in var
    result = result[self._slice]
../dask/dask/dataframe/core.py:4062: in __getitem__
    meta = self._meta[_extract_meta(key)]
../compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/contextlib.py:75: in inner
    return func(*args, **kwds)
python/cudf/cudf/core/dataframe.py:1007: in __getitem__
    return self._get_columns_by_label(arg, downcast=True)
python/cudf/cudf/core/dataframe.py:1863: in _get_columns_by_label
    new_data = super()._get_columns_by_label(labels, downcast)
python/cudf/cudf/core/frame.py:507: in _get_columns_by_label
    return self._data.select_by_label(labels)
python/cudf/cudf/core/column_accessor.py:344: in select_by_label
    return self._select_by_label_grouped(key)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = ColumnAccessor(multiindex=False, level_names=[None])
y: float64, key = 'x'

    def _select_by_label_grouped(self, key: Any) -> ColumnAccessor:
>       result = self._grouped_data[key]
E       KeyError: 'x'

python/cudf/cudf/core/column_accessor.py:406: KeyError

Instead of adding these functions in individually, is it possible / would it make sense to add them in programmatically based on SUPPORTED_AGGS?

return groupby_agg(
self.obj,
self.index,
{self._slice: "var"},
split_every=split_every,
split_out=split_out,
dropna=self.dropna,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
)[self._slice]

def collect(self, split_every=None, split_out=1):
return groupby_agg(
self.obj,
self.index,
{self._slice: "collect"},
split_every=split_every,
split_out=split_out,
dropna=self.dropna,
sep=self.sep,
sort=self.sort,
as_index=self.as_index,
)[self._slice]

def aggregate(self, arg, split_every=None, split_out=1):
if arg == "size":
return self.size()
arg = _redirect_aggs(arg)

_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
"first",
"last",
}
if not isinstance(arg, dict):
arg = {self._slice: arg}

if (
isinstance(self.obj, DaskDataFrame)
and isinstance(self.index, (str, list))
and _is_supported({self._slice: arg}, _supported)
and _is_supported(arg, SUPPORTED_AGGS)
):
return groupby_agg(
self.obj,
self.index,
{self._slice: arg},
arg,
split_every=split_every,
split_out=split_out,
dropna=self.dropna,
Expand Down Expand Up @@ -201,21 +244,9 @@ def groupby_agg(
"""
# Assert that aggregations are supported
aggs = _redirect_aggs(aggs_in)
_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
"first",
"last",
}
if not _is_supported(aggs, _supported):
if not _is_supported(aggs, SUPPORTED_AGGS):
raise ValueError(
f"Supported aggs include {_supported} for groupby_agg API. "
f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. "
f"Aggregations must be specified with dict or list syntax."
)

Expand Down Expand Up @@ -475,7 +506,7 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep):
agg = col.split(sep)[-1]
if agg in ("count", "sum"):
agg_dict[col] = ["sum"]
elif agg in ("min", "max", "collect"):
elif agg in SUPPORTED_AGGS:
agg_dict[col] = [agg]
else:
raise ValueError(f"Unexpected aggregation: {agg}")
Expand Down
46 changes: 13 additions & 33 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
from cudf.core._compat import PANDAS_GE_120

import dask_cudf
from dask_cudf.groupby import _is_supported
from dask_cudf.groupby import SUPPORTED_AGGS, _is_supported


@pytest.mark.parametrize("aggregation", ["sum", "mean", "count", "min", "max"])
def test_groupby_basic_aggs(aggregation):
@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
@pytest.mark.parametrize("series", [False, True])
def test_groupby_basic(series, aggregation):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
Expand All @@ -24,19 +25,23 @@ def test_groupby_basic_aggs(aggregation):
)

gdf = cudf.DataFrame.from_pandas(pdf)
gdf_grouped = gdf.groupby("x")
ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("x")

ddf = dask_cudf.from_cudf(gdf, npartitions=5)
if series:
gdf_grouped = gdf_grouped.x
ddf_grouped = ddf_grouped.x

a = getattr(gdf.groupby("x"), aggregation)()
b = getattr(ddf.groupby("x"), aggregation)().compute()
a = getattr(gdf_grouped, aggregation)()
b = getattr(ddf_grouped, aggregation)().compute()

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
else:
dd.assert_eq(a, b)

a = gdf.groupby("x").agg({"x": aggregation})
b = ddf.groupby("x").agg({"x": aggregation}).compute()
a = gdf_grouped.agg({"x": aggregation})
b = ddf_grouped.agg({"x": aggregation}).compute()

if aggregation == "count":
dd.assert_eq(a, b, check_dtype=False)
Expand Down Expand Up @@ -117,31 +122,6 @@ def test_groupby_std(func):
dd.assert_eq(a, b)


@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby("x").agg({"y": "collect"}),
lambda df: df.groupby("x").y.agg("collect"),
],
)
def test_groupby_collect(func):
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)

gdf = cudf.DataFrame.from_pandas(pdf)

ddf = dask_cudf.from_cudf(gdf, npartitions=5)

a = func(gdf).to_pandas()
b = func(ddf).compute().to_pandas()

dd.assert_eq(a, b)


# reason gotattr in cudf
@pytest.mark.parametrize(
"func",
Expand Down