From 68fa35e5dba58045203f20fe77a306bf59da7480 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 21 Oct 2021 07:43:22 -0700 Subject: [PATCH 1/5] Make sure all supported aggs are handled in _tree_node_agg --- python/dask_cudf/dask_cudf/groupby.py | 73 ++++++++----------- .../dask_cudf/dask_cudf/tests/test_groupby.py | 29 +------- 2 files changed, 33 insertions(+), 69 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 0cf9d835523..6aaa6d7396e 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -19,6 +19,19 @@ import cudf +SUPPORTED_AGGS = { + "count", + "mean", + "std", + "var", + "sum", + "min", + "max", + "collect", + "first", + "last", +} + class CudfDataFrameGroupBy(DataFrameGroupBy): def __init__(self, *args, **kwargs): @@ -47,6 +60,19 @@ def __getitem__(self, key): g._meta = g._meta[key] return g + def collect(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.index, + {c: "collect" for c in self.obj.columns if c not in self.index}, + split_every=split_every, + split_out=split_out, + dropna=self.dropna, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + ) + def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -65,18 +91,6 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - _supported = { - "count", - "mean", - "std", - "var", - "sum", - "min", - "max", - "collect", - "first", - "last", - } if ( isinstance(self.obj, DaskDataFrame) and ( @@ -86,7 +100,7 @@ def aggregate(self, arg, split_every=None, split_out=1): and all(isinstance(x, str) for x in self.index) ) ) - and _is_supported(arg, _supported) + and _is_supported(arg, SUPPORTED_AGGS) ): if isinstance(self._meta.grouping.keys, cudf.MultiIndex): keys = self._meta.grouping.keys.names @@ -134,23 +148,10 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) - _supported = { - "count", - "mean", - "std", - "var", - "sum", - "min", - "max", - "collect", - "first", - "last", - } - if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) - and _is_supported({self._slice: arg}, _supported) + and _is_supported({self._slice: arg}, SUPPORTED_AGGS) ): return groupby_agg( self.obj, @@ -201,21 +202,9 @@ def groupby_agg( """ # Assert that aggregations are supported aggs = _redirect_aggs(aggs_in) - _supported = { - "count", - "mean", - "std", - "var", - "sum", - "min", - "max", - "collect", - "first", - "last", - } - if not _is_supported(aggs, _supported): + if not _is_supported(aggs, SUPPORTED_AGGS): raise ValueError( - f"Supported aggs include {_supported} for groupby_agg API. " + f"Supported aggs include {SUPPORTED_AGGS} for groupby_agg API. " f"Aggregations must be specified with dict or list syntax." ) @@ -478,7 +467,7 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep): agg = col.split(sep)[-1] if agg in ("count", "sum"): agg_dict[col] = ["sum"] - elif agg in ("min", "max", "collect"): + elif agg in SUPPORTED_AGGS: agg_dict[col] = [agg] else: raise ValueError(f"Unexpected aggregation: {agg}") diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 6569ffa94c5..da0d669cc5e 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -11,10 +11,10 @@ from cudf.core._compat import PANDAS_GE_120 import dask_cudf -from dask_cudf.groupby import _is_supported +from dask_cudf.groupby import SUPPORTED_AGGS, _is_supported -@pytest.mark.parametrize("aggregation", ["sum", "mean", "count", "min", "max"]) +@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) def test_groupby_basic_aggs(aggregation): pdf = pd.DataFrame( { @@ -117,31 +117,6 @@ def test_groupby_std(func): dd.assert_eq(a, b) -@pytest.mark.parametrize( - "func", - [ - lambda df: df.groupby("x").agg({"y": "collect"}), - lambda df: df.groupby("x").y.agg("collect"), - ], -) -def test_groupby_collect(func): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - - gdf = cudf.DataFrame.from_pandas(pdf) - - ddf = dask_cudf.from_cudf(gdf, npartitions=5) - - a = func(gdf).to_pandas() - b = func(ddf).compute().to_pandas() - - dd.assert_eq(a, b) - - # reason gotattr in cudf @pytest.mark.parametrize( "func", From db2960d838068dbb83b7c8e640025b0fce42e981 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 21 Oct 2021 15:36:16 -0400 Subject: [PATCH 2/5] Make supported aggs a tuple to guarantee ordering --- python/dask_cudf/dask_cudf/groupby.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 6aaa6d7396e..2e65b9d831e 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -19,7 +19,7 @@ import cudf -SUPPORTED_AGGS = { +SUPPORTED_AGGS = ( "count", "mean", "std", @@ -30,7 +30,7 @@ "collect", "first", "last", -} +) class CudfDataFrameGroupBy(DataFrameGroupBy): From 6659e13ca49d2ed84c38732b98bfe6132ca27740 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 2 Nov 2021 14:28:05 -0700 Subject: [PATCH 3/5] Add tests for dask-cudf series groupby --- python/dask_cudf/dask_cudf/groupby.py | 50 ++++++++++++------- .../dask_cudf/dask_cudf/tests/test_groupby.py | 32 +++++++++++- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index dfd17fbaad7..aaceec099e2 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -73,19 +73,6 @@ def collect(self, split_every=None, split_out=1): as_index=self.as_index, ) - def mean(self, split_every=None, split_out=1): - return groupby_agg( - self.obj, - self.index, - {c: "mean" for c in self.obj.columns if c not in self.index}, - split_every=split_every, - split_out=split_out, - dropna=self.dropna, - sep=self.sep, - sort=self.sort, - as_index=self.as_index, - ) - def aggregate(self, arg, split_every=None, split_out=1): if arg == "size": return self.size() @@ -130,11 +117,37 @@ def __init__(self, *args, **kwargs): self.as_index = kwargs.pop("as_index", True) super().__init__(*args, **kwargs) - def mean(self, split_every=None, split_out=1): + def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, self.index, - {self._slice: "mean"}, + {self._slice: "collect"}, + split_every=split_every, + split_out=split_out, + dropna=self.dropna, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + )[self._slice] + + def std(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.index, + {self._slice: "std"}, + split_every=split_every, + split_out=split_out, + dropna=self.dropna, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + )[self._slice] + + def var(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.index, + {self._slice: "var"}, split_every=split_every, split_out=split_out, dropna=self.dropna, @@ -148,15 +161,18 @@ def aggregate(self, arg, split_every=None, split_out=1): return self.size() arg = _redirect_aggs(arg) + if not isinstance(arg, dict): + arg = {self._slice: arg} + if ( isinstance(self.obj, DaskDataFrame) and isinstance(self.index, (str, list)) - and _is_supported({self._slice: arg}, SUPPORTED_AGGS) + and _is_supported(arg, SUPPORTED_AGGS) ): return groupby_agg( self.obj, self.index, - {self._slice: arg}, + arg, split_every=split_every, split_out=split_out, dropna=self.dropna, diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index da0d669cc5e..8a759b92800 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -15,7 +15,7 @@ @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) -def test_groupby_basic_aggs(aggregation): +def test_groupby_basic_frame(aggregation): pdf = pd.DataFrame( { "x": np.random.randint(0, 5, size=10000), @@ -44,6 +44,36 @@ def test_groupby_basic_aggs(aggregation): dd.assert_eq(a, b) +@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) +def test_groupby_basic_series(aggregation): + pdf = pd.DataFrame( + { + "x": np.random.randint(0, 5, size=10000), + "y": np.random.normal(size=10000), + } + ) + + gdf = cudf.DataFrame.from_pandas(pdf) + + ddf = dask_cudf.from_cudf(gdf, npartitions=5) + + a = getattr(gdf.groupby("x").x, aggregation)() + b = getattr(ddf.groupby("x").x, aggregation)().compute() + + if aggregation == "count": + dd.assert_eq(a, b, check_dtype=False) + else: + dd.assert_eq(a, b) + + a = gdf.groupby("x").x.agg({"x": aggregation}) + b = ddf.groupby("x").x.agg({"x": aggregation}).compute() + + if aggregation == "count": + dd.assert_eq(a, b, check_dtype=False) + else: + dd.assert_eq(a, b) + + @pytest.mark.parametrize( "func", [ From d7fed2e4331cdbe08fbcd12220c349b9496a509d Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 11:44:34 -0700 Subject: [PATCH 4/5] Add back mean methods --- python/dask_cudf/dask_cudf/groupby.py | 30 +++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index aaceec099e2..149d98ebfb9 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -60,6 +60,19 @@ def __getitem__(self, key): g._meta = g._meta[key] return g + def mean(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.index, + {c: "mean" for c in self.obj.columns if c not in self.index}, + split_every=split_every, + split_out=split_out, + dropna=self.dropna, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + ) + def collect(self, split_every=None, split_out=1): return groupby_agg( self.obj, @@ -117,11 +130,11 @@ def __init__(self, *args, **kwargs): self.as_index = kwargs.pop("as_index", True) super().__init__(*args, **kwargs) - def collect(self, split_every=None, split_out=1): + def mean(self, split_every=None, split_out=1): return groupby_agg( self.obj, self.index, - {self._slice: "collect"}, + {self._slice: "mean"}, split_every=split_every, split_out=split_out, dropna=self.dropna, @@ -156,6 +169,19 @@ def var(self, split_every=None, split_out=1): as_index=self.as_index, )[self._slice] + def collect(self, split_every=None, split_out=1): + return groupby_agg( + self.obj, + self.index, + {self._slice: "collect"}, + split_every=split_every, + split_out=split_out, + dropna=self.dropna, + sep=self.sep, + sort=self.sort, + as_index=self.as_index, + )[self._slice] + def aggregate(self, arg, split_every=None, split_out=1): if arg == "size": return self.size() From 73bfc4dcd70da841bf6ad09b31f50595562990b6 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Tue, 9 Nov 2021 09:07:10 -0800 Subject: [PATCH 5/5] Consolidate basic frame/series groupby tests --- .../dask_cudf/dask_cudf/tests/test_groupby.py | 47 +++++-------------- 1 file changed, 11 insertions(+), 36 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 8a759b92800..fce9b773dac 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -15,7 +15,8 @@ @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) -def test_groupby_basic_frame(aggregation): +@pytest.mark.parametrize("series", [False, True]) +def test_groupby_basic(series, aggregation): pdf = pd.DataFrame( { "x": np.random.randint(0, 5, size=10000), @@ -24,49 +25,23 @@ def test_groupby_basic_frame(aggregation): ) gdf = cudf.DataFrame.from_pandas(pdf) + gdf_grouped = gdf.groupby("x") + ddf_grouped = dask_cudf.from_cudf(gdf, npartitions=5).groupby("x") - ddf = dask_cudf.from_cudf(gdf, npartitions=5) - - a = getattr(gdf.groupby("x"), aggregation)() - b = getattr(ddf.groupby("x"), aggregation)().compute() - - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) - - a = gdf.groupby("x").agg({"x": aggregation}) - b = ddf.groupby("x").agg({"x": aggregation}).compute() - - if aggregation == "count": - dd.assert_eq(a, b, check_dtype=False) - else: - dd.assert_eq(a, b) - - -@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS) -def test_groupby_basic_series(aggregation): - pdf = pd.DataFrame( - { - "x": np.random.randint(0, 5, size=10000), - "y": np.random.normal(size=10000), - } - ) - - gdf = cudf.DataFrame.from_pandas(pdf) - - ddf = dask_cudf.from_cudf(gdf, npartitions=5) + if series: + gdf_grouped = gdf_grouped.x + ddf_grouped = ddf_grouped.x - a = getattr(gdf.groupby("x").x, aggregation)() - b = getattr(ddf.groupby("x").x, aggregation)().compute() + a = getattr(gdf_grouped, aggregation)() + b = getattr(ddf_grouped, aggregation)().compute() if aggregation == "count": dd.assert_eq(a, b, check_dtype=False) else: dd.assert_eq(a, b) - a = gdf.groupby("x").x.agg({"x": aggregation}) - b = ddf.groupby("x").x.agg({"x": aggregation}).compute() + a = gdf_grouped.agg({"x": aggregation}) + b = ddf_grouped.agg({"x": aggregation}).compute() if aggregation == "count": dd.assert_eq(a, b, check_dtype=False)