From df93b7feb9cc7ab03580e5a3249ca9c27f59a838 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 16 Apr 2021 08:15:57 -0700 Subject: [PATCH 1/6] Add naive list collect agg to dask-cudf --- python/dask_cudf/dask_cudf/groupby.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 2803212a502..c4b38bd217f 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -61,7 +61,7 @@ def aggregate(self, arg, split_every=None, split_out=1): if arg == "size": return self.size() - _supported = {"count", "mean", "std", "var", "sum", "min", "max"} + _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) @@ -145,7 +145,7 @@ def groupby_agg( This aggregation algorithm only supports the following options: - {"count", "mean", "std", "var", "sum", "min", "max"} + {"count", "mean", "std", "var", "sum", "min", "max", list} This "optimized" approach is more performant than the algorithm in `dask.dataframe`, because it allows the cudf backend to @@ -171,7 +171,7 @@ def groupby_agg( # strings (no lists) str_cols_out = True for col in aggs: - if isinstance(aggs[col], str): + if isinstance(aggs[col], str) or callable(aggs[col]): aggs[col] = [aggs[col]] else: str_cols_out = False @@ -179,7 +179,7 @@ def groupby_agg( columns.append(col) # Assert that aggregations are supported - _supported = {"count", "mean", "std", "var", "sum", "min", "max"} + _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} if not _is_supported(aggs, _supported): raise ValueError( f"Supported aggs include {_supported} for groupby_agg API. " @@ -255,7 +255,12 @@ def groupby_agg( # be str, rather than tuples). for col in aggs: _aggs[col] = _aggs[col][0] - _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs) + try: + _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs) + except NotImplementedError: + _meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg( + _aggs + ) for s in range(split_out): dsk[(gb_agg_name, s)] = ( _finalize_gb_agg, @@ -381,6 +386,8 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep): agg_dict[col] = ["sum"] elif agg in ("min", "max"): agg_dict[col] = [agg] + elif agg == "list": + agg_dict[col] = [list] else: raise ValueError(f"Unexpected aggregation: {agg}") From b0218b45e774d4b553538877e068b421e9bf7c24 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 23 Apr 2021 07:24:00 -0700 Subject: [PATCH 2/6] Add collect support for dask-cudf series --- python/dask_cudf/dask_cudf/groupby.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 6fb041dde6a..1445a668a47 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -107,7 +107,7 @@ def aggregate(self, arg, split_every=None, split_out=1): if arg == "size": return self.size() - _supported = {"count", "mean", "std", "var", "sum", "min", "max"} + _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) From 6dee893cc1905e1465effb9627748fd7301958e4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 3 May 2021 10:38:20 -0700 Subject: [PATCH 3/6] Redirect callable list agg to 'collect' --- python/dask_cudf/dask_cudf/groupby.py | 47 +++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index ea93e8c8f00..bd6aeeccaf1 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -62,7 +62,16 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} + _supported = { + "count", + "mean", + "std", + "var", + "sum", + "min", + "max", + "collect", + } if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) @@ -109,7 +118,16 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} + _supported = { + "count", + "mean", + "std", + "var", + "sum", + "min", + "max", + "collect", + } if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) @@ -147,7 +165,7 @@ def groupby_agg( This aggregation algorithm only supports the following options: - {"count", "mean", "std", "var", "sum", "min", "max", list} + {"count", "mean", "std", "var", "sum", "min", "max", "collect"} This "optimized" approach is more performant than the algorithm in `dask.dataframe`, because it allows the cudf backend to @@ -181,7 +199,16 @@ def groupby_agg( columns.append(col) # Assert that aggregations are supported - _supported = {"count", "mean", "std", "var", "sum", "min", "max", list} + _supported = { + "count", + "mean", + "std", + "var", + "sum", + "min", + "max", + "collect", + } if not _is_supported(aggs, _supported): raise ValueError( f"Supported aggs include {_supported} for groupby_agg API. " @@ -287,7 +314,13 @@ def groupby_agg( def _redirect_aggs(arg): """ Redirect aggregations to their corresponding name in cuDF """ - redirects = {sum: "sum", max: "max", min: "min"} + redirects = { + sum: "sum", + max: "max", + min: "min", + list: "collect", + "list": "collect", + } if isinstance(arg, dict): new_arg = dict() for col in arg: @@ -405,8 +438,8 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep): agg_dict[col] = ["sum"] elif agg in ("min", "max"): agg_dict[col] = [agg] - elif agg == "list": - agg_dict[col] = [list] + elif agg == "collect": + agg_dict[col] = ["collect"] else: raise ValueError(f"Unexpected aggregation: {agg}") From a9dc883ce4e820a41a983134f079f5ace79fbdd5 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 26 May 2021 11:11:25 -0400 Subject: [PATCH 4/6] Remove meta try/except block --- python/dask_cudf/dask_cudf/groupby.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index bd6aeeccaf1..4c917629390 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -284,12 +284,7 @@ def groupby_agg( # be str, rather than tuples). for col in aggs: _aggs[col] = _aggs[col][0] - try: - _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs) - except NotImplementedError: - _meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg( - _aggs - ) + _meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs) for s in range(split_out): dsk[(gb_agg_name, s)] = ( _finalize_gb_agg, From 1c12d9c87483e48a5814baeb3a6e2820a82f31f0 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Fri, 2 Jul 2021 16:25:55 -0400 Subject: [PATCH 5/6] Flatten nested groupby collect result --- python/dask_cudf/dask_cudf/groupby.py | 3 ++ .../dask_cudf/dask_cudf/tests/test_groupby.py | 32 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 4c917629390..336fdaf009c 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -513,6 +513,9 @@ def _finalize_gb_agg( gb.drop(columns=[sum_name], inplace=True) if "count" not in agg_list: gb.drop(columns=[count_name], inplace=True) + if "collect" in agg_list: + collect_name = _make_name(col, "collect", sep=sep) + gb[collect_name] = gb[collect_name].list.concat() # Ensure sorted keys if `sort=True` if sort: diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index e3a3045dcc7..135bbe83923 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -125,6 +125,38 @@ def test_groupby_std(func): dd.assert_eq(a, b) +@pytest.mark.parametrize( + "func", + [ + lambda df: df.groupby("x").agg({"y": "collect"}), + pytest.param( + lambda df: df.groupby("x").y.agg("collect"), marks=pytest.mark.skip + ), + ], +) +def test_groupby_collect(func): + pdf = pd.DataFrame( + { + "x": np.random.randint(0, 5, size=10000), + "y": np.random.normal(size=10000), + } + ) + + gdf = cudf.DataFrame.from_pandas(pdf) + + ddf = dask_cudf.from_cudf(gdf, npartitions=5) + + a = func(gdf).to_pandas() + b = func(ddf).compute().to_pandas() + + a.index.name = None + a.name = None + b.index.name = None + b.name = None + + dd.assert_eq(a, b) + + # reason gotattr in cudf @pytest.mark.parametrize( "func", From 405b2c7d61547d9feeeade2f0754fb34bf8b572e Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 6 Jul 2021 10:13:22 -0400 Subject: [PATCH 6/6] Remove index nulling from test --- python/dask_cudf/dask_cudf/tests/test_groupby.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 135bbe83923..356567fdef0 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -149,11 +149,6 @@ def test_groupby_collect(func): a = func(gdf).to_pandas() b = func(ddf).compute().to_pandas() - a.index.name = None - a.name = None - b.index.name = None - b.name = None - dd.assert_eq(a, b)