Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure all dask-cudf supported aggs are handled in _tree_node_agg #9487

Merged
merged 6 commits into from
Nov 9, 2021

Conversation

charlesbluca
Copy link
Member

I noticed that for some of dask-cudf's supported aggregations (specifically first and last), we end up throwing a ValueError in _tree_node_agg because we do not have a case for them:

import cudf
import dask_cudf
from dask_sql import Context

df = cudf.DataFrame({"yr": [0, 2, 3] * 500, "inches": list(range(1500))})
ddf = dask_cudf.from_cudf(df, npartitions=5)

ddf.groupby(by="yr").agg({'yr': ['first']}).compute()
ValueError                                Traceback (most recent call last)
<ipython-input-1-7d2fee978fba> in <module>
      6 ddf = dask_cudf.from_cudf(df, npartitions=5)
      7 
----> 8 ddf.groupby(by="yr").agg({'yr': ['first']}).compute()

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    286         dask.base.compute
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result
    290 

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    568         postcomputes.append(x.__dask_postcompute__())
    569 
--> 570     results = schedule(dsk, keys, **kwargs)
    571     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    572 

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    561     """
    562     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 563     return get_async(
    564         synchronous_executor.submit,
    565         synchronous_executor._max_workers,

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    504             while state["waiting"] or state["ready"] or state["running"]:
    505                 fire_tasks(chunksize)
--> 506                 for key, res_info, failed in queue_get(queue).result():
    507                     if failed:
    508                         exc, tb = loads(res_info)

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    435                     raise CancelledError()
    436                 elif self._state == FINISHED:
--> 437                     return self.__get_result()
    438 
    439                 self._condition.wait(timeout)

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    387         if self._exception:
    388             try:
--> 389                 raise self._exception
    390             finally:
    391                 # Break a reference cycle with the exception in self._exception

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in submit(self, fn, *args, **kwargs)
    546         fut = Future()
    547         try:
--> 548             fut.set_result(fn(*args, **kwargs))
    549         except BaseException as e:
    550             fut.set_exception(e)

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in batch_execute_tasks(it)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in <listcomp>(.0)
    235     Batch computing of multiple tasks with `execute_task`
    236     """
--> 237     return [execute_task(*a) for a in it]
    238 
    239 

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    226         failed = False
    227     except BaseException as e:
--> 228         result = pack_exception(e, dumps)
    229         failed = True
    230     return key, result, failed

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    221     try:
    222         task, data = loads(task_info)
--> 223         result = _execute_task(task, data)
    224         id = get_id()
    225         result = dumps((result, id))

~/compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/cudf/python/dask_cudf/dask_cudf/groupby.py in _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep)
    482             agg_dict[col] = [agg]
    483         else:
--> 484             raise ValueError(f"Unexpected aggregation: {agg}")
    485 
    486     gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg(

ValueError: Unexpected aggregation: first

This PR unifies all references to _supported to now reference module level SUPPORTED_AGGS, and makes sure all aggs in this variable are handled in _tree_node_agg. This variable is also imported and used in test_groupby_basic_aggs, so that we don't need to remember to add an agg to that test manually.

Additionally added CudfDataFrameGroupBy.collect to get these updated tests passing.

@charlesbluca charlesbluca added bug Something isn't working 3 - Ready for Review Ready for review by team dask-cudf non-breaking Non-breaking change labels Oct 21, 2021
@charlesbluca charlesbluca requested a review from a team as a code owner October 21, 2021 15:11
@github-actions github-actions bot added the Python Affects Python cuDF API. label Oct 21, 2021
@codecov
Copy link

codecov bot commented Oct 21, 2021

Codecov Report

Merging #9487 (09755d6) into branch-21.12 (ab4bfaa) will decrease coverage by 0.10%.
The diff coverage is n/a.

❗ Current head 09755d6 differs from pull request most recent head 73bfc4d. Consider uploading reports for the commit 73bfc4d to get more accurate results
Impacted file tree graph

@@               Coverage Diff                @@
##           branch-21.12    #9487      +/-   ##
================================================
- Coverage         10.79%   10.68%   -0.11%     
================================================
  Files               116      117       +1     
  Lines             18869    19444     +575     
================================================
+ Hits               2036     2078      +42     
- Misses            16833    17366     +533     
Impacted Files Coverage Δ
python/dask_cudf/dask_cudf/sorting.py 92.90% <0.00%> (-1.21%) ⬇️
python/cudf/cudf/io/csv.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/orc.py 0.00% <0.00%> (ø)
python/cudf/cudf/__init__.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/dlpack.py 0.00% <0.00%> (ø)
python/cudf/cudf/core/frame.py 0.00% <0.00%> (ø)
python/cudf/cudf/core/index.py 0.00% <0.00%> (ø)
python/cudf/cudf/io/parquet.py 0.00% <0.00%> (ø)
python/cudf/cudf/core/series.py 0.00% <0.00%> (ø)
python/cudf/cudf/core/reshape.py 0.00% <0.00%> (ø)
... and 45 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8a5adad...73bfc4d. Read the comment docs.

@galipremsagar
Copy link
Contributor

Overall looks good to me, minor comments..

as_index=self.as_index,
)[self._slice]

def var(self, split_every=None, split_out=1):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added in overrides for var and std because it seems like trying to use upstream Dask's implementations here fails:

python/dask_cudf/dask_cudf/tests/test_groupby.py ............F
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> traceback >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

aggregation = 'std'

    @pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
    def test_groupby_basic_series(aggregation):
        pdf = pd.DataFrame(
            {
                "x": np.random.randint(0, 5, size=10000),
                "y": np.random.normal(size=10000),
            }
        )
    
        gdf = cudf.DataFrame.from_pandas(pdf)
    
        ddf = dask_cudf.from_cudf(gdf, npartitions=5)
    
        a = getattr(gdf.groupby("x").x, aggregation)()
>       b = getattr(ddf.groupby("x").x, aggregation)().compute()

python/dask_cudf/dask_cudf/tests/test_groupby.py:61: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../dask/dask/dataframe/groupby.py:1445: in std
    v = self.var(ddof, split_every=split_every, split_out=split_out)
../dask/dask/dataframe/groupby.py:1439: in var
    result = result[self._slice]
../dask/dask/dataframe/core.py:4062: in __getitem__
    meta = self._meta[_extract_meta(key)]
../compose/etc/conda/cuda_11.2/envs/rapids/lib/python3.8/contextlib.py:75: in inner
    return func(*args, **kwds)
python/cudf/cudf/core/dataframe.py:1007: in __getitem__
    return self._get_columns_by_label(arg, downcast=True)
python/cudf/cudf/core/dataframe.py:1863: in _get_columns_by_label
    new_data = super()._get_columns_by_label(labels, downcast)
python/cudf/cudf/core/frame.py:507: in _get_columns_by_label
    return self._data.select_by_label(labels)
python/cudf/cudf/core/column_accessor.py:344: in select_by_label
    return self._select_by_label_grouped(key)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = ColumnAccessor(multiindex=False, level_names=[None])
y: float64, key = 'x'

    def _select_by_label_grouped(self, key: Any) -> ColumnAccessor:
>       result = self._grouped_data[key]
E       KeyError: 'x'

python/cudf/cudf/core/column_accessor.py:406: KeyError

Instead of adding these functions in individually, is it possible / would it make sense to add them in programmatically based on SUPPORTED_AGGS?

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this @charlesbluca ! Just a few questions - Mostly for my own understanding/benefit :)

python/dask_cudf/dask_cudf/groupby.py Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Outdated Show resolved Hide resolved
python/dask_cudf/dask_cudf/groupby.py Outdated Show resolved Hide resolved
@pytest.mark.parametrize("aggregation", ["sum", "mean", "count", "min", "max"])
def test_groupby_basic_aggs(aggregation):
@pytest.mark.parametrize("aggregation", SUPPORTED_AGGS)
def test_groupby_basic_frame(aggregation):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand how this is a frame test now? The test looks the same as test_groupby_basic_series?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is pretty subtle; in test_groupby_basic_series we do the aggregation on one of the columns of the frame instead of the frame itself. Happy to make more explicit series groupby tests if needed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah - I see now :)

It may make sense to combine the two tests and use pytest.mark.parametrize to cover both cases. However, this is not a blocker (I don't feel very strongly at all).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I consolidated the tests into one based on a series parameter 🙂

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the (mostly) duplicated test could be simplified a bit, but these changes generally LGTM. Thanks!

@charlesbluca
Copy link
Member Author

@gpucibot merge

@rapids-bot rapids-bot bot merged commit a7d520c into rapidsai:branch-21.12 Nov 9, 2021
@charlesbluca charlesbluca deleted the dask-fix-tree-node branch July 19, 2022 14:26
@vyasr vyasr added dask Dask issue and removed dask-cudf labels Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team bug Something isn't working dask Dask issue non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants