Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Groupby collect_set / unique fails with Dask #7840

Closed
beckernick opened this issue Apr 2, 2021 · 2 comments · Fixed by #8735
Closed

[BUG] Groupby collect_set / unique fails with Dask #7840

beckernick opened this issue Apr 2, 2021 · 2 comments · Fixed by #8735
Assignees
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.

Comments

@beckernick
Copy link
Member

cuDF recently implemented groupby collect set and exposed it as groupby.unique. I'd like to be able to use this with Dask, like I can on the CPU. We may want to explore this in Dask or special case in Dask cuDF. It looks like we fail doing a typecheck on a ListDtype. Complement to #7812

import cudf
import dask_cudf
import pandas as pd
import dask.dataframe as dd
​
​
df = pd.DataFrame({
    "a":[0,0,0,1,1,1],
    "b":[10,10,10,7,8,9]
})
ddf = dd.from_pandas(df, 2)
​
gdf = cudf.from_pandas(df)
gddf = dask_cudf.from_cudf(gdf, 2)
​
​
print(ddf.groupby("a").b.unique().compute()) # works as expected
print(gddf.groupby("a").b.unique().compute())
a
0         [10]
1    [7, 8, 9]
Name: b, dtype: object
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-21-f8b1b8dfa717> in <module>
     16 
     17 print(ddf.groupby("a").b.unique().compute()) # works as expected
---> 18 print(gddf.groupby("a").b.unique().compute())

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    281         dask.base.compute
    282         """
--> 283         (result,) = compute(self, traverse=False, **kwargs)
    284         return result
    285 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    563         postcomputes.append(x.__dask_postcompute__())
    564 
--> 565     results = schedule(dsk, keys, **kwargs)
    566     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    567 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
    526     """
    527     kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 528     return get_async(apply_sync, 1, dsk, keys, **kwargs)
    529 
    530 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    470             # Seed initial tasks into the thread pool
    471             while state["ready"] and len(state["running"]) < num_workers:
--> 472                 fire_task()
    473 
    474             # Main loop, wait on tasks to finish, insert new ones

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in fire_task()
    465                         pack_exception,
    466                     ),
--> 467                     callback=queue.put,
    468                 )
    469 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
    515 def apply_sync(func, args=(), kwds={}, callback=None):
    516     """ A naive synchronous version of apply_async """
--> 517     res = func(*args, **kwds)
    518     if callback is not None:
    519         callback(res)

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    225         failed = False
    226     except BaseException as e:
--> 227         result = pack_exception(e, dumps)
    228         failed = True
    229     return key, result, failed

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/utils.py in apply(func, args, kwargs)
     33 def apply(func, args, kwargs=None):
     34     if kwargs:
---> 35         return func(*args, **kwargs)
     36     else:
     37         return func(*args)

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/dataframe/groupby.py in _apply_chunk(df, dropna, observed, *index, **kwargs)
    301         if isinstance(columns, (tuple, list, set, pd.Index)):
    302             columns = list(columns)
--> 303         return func(g[columns], **kwargs)
    304 
    305 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/dask/utils.py in __call__(self, obj, *args, **kwargs)
    899 
    900     def __call__(self, obj, *args, **kwargs):
--> 901         return getattr(obj, self.method)(*args, **kwargs)
    902 
    903     def __reduce__(self):

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in _agg_func_name_with_args(self, func_name, *args, **kwargs)
    279 
    280         func.__name__ = func_name
--> 281         return self.agg(func)
    282 
    283     def _normalize_aggs(self, aggs):

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/groupby/groupby.py in agg(self, func)
    786         if len(result._data):
    787             if result.shape[1] == 1 and not pd.api.types.is_list_like(func):
--> 788                 return result.iloc[:, 0]
    789 
    790         # drop the first level if we have a multiindex

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/indexing.py in __getitem__(self, arg)
    214             if not isinstance(arg, tuple):
    215                 arg = (arg, slice(None))
--> 216             return self._getitem_tuple_arg(arg)
    217 
    218     def __setitem__(self, key, value):

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/contextlib.py in inner(*args, **kwds)
     72         def inner(*args, **kwds):
     73             with self._recreate_cm():
---> 74                 return func(*args, **kwds)
     75         return inner
     76 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/indexing.py in _getitem_tuple_arg(self, arg)
    468         # Iloc Step 4:
    469         # Downcast
--> 470         if self._can_downcast_to_series(df, arg):
    471             return self._downcast_to_series(df, arg)
    472 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/indexing.py in _can_downcast_to_series(self, df, arg)
    250             dtypes = df.dtypes.values.tolist()
    251             all_numeric = all(
--> 252                 [pd.api.types.is_numeric_dtype(t) for t in dtypes]
    253             )
    254             if all_numeric:

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/cudf/core/indexing.py in <listcomp>(.0)
    250             dtypes = df.dtypes.values.tolist()
    251             all_numeric = all(
--> 252                 [pd.api.types.is_numeric_dtype(t) for t in dtypes]
    253             )
    254             if all_numeric:

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/pandas/core/dtypes/common.py in is_numeric_dtype(arr_or_dtype)
   1270     """
   1271     return _is_dtype_type(
-> 1272         arr_or_dtype, classes_and_not_datetimelike(np.number, np.bool_)
   1273     )
   1274 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/pandas/core/dtypes/common.py in _is_dtype_type(arr_or_dtype, condition)
   1649         return False
   1650 
-> 1651     return condition(tipo)
   1652 
   1653 

/raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331/lib/python3.7/site-packages/pandas/core/dtypes/common.py in <lambda>(tipo)
    194     """
    195     return lambda tipo: (
--> 196         issubclass(tipo, klasses)
    197         and not issubclass(tipo, (np.datetime64, np.timedelta64))
    198     )

TypeError: issubclass() arg 1 must be a class
!conda list | grep "rapids\|dask\|numpy\|cupy\|arrow\|pandas"
# packages in environment at /raid/nicholasb/miniconda3/envs/rapids-gpubdb-20210331:
arrow-cpp                 1.0.1           py37h2318771_14_cuda    conda-forge
arrow-cpp-proc            3.0.0                      cuda    conda-forge
cudf                      0.19.0a210331   cuda_10.2_py37_gc99fcef41b_313    rapidsai-nightly
cuml                      0.19.0a210331   cuda10.2_py37_g83168076c_138    rapidsai-nightly
cupy                      8.6.0            py37h7fc54ca_0    conda-forge
dask                      2021.3.1           pyhd8ed1ab_0    conda-forge
dask-core                 2021.3.1           pyhd8ed1ab_0    conda-forge
dask-cuda                 0.19.0a210331           py37_45    rapidsai-nightly
dask-cudf                 0.19.0a210331   py37_gc99fcef41b_313    rapidsai-nightly
libcudf                   0.19.0a210331   cuda10.2_gbe2f0c000f_314    rapidsai-nightly
libcuml                   0.19.0a210331   cuda10.2_g83168076c_138    rapidsai-nightly
libcumlprims              0.19.0a210316   cuda10.2_ge7e82a0_12    rapidsai-nightly
librmm                    0.19.0a210331   cuda10.2_g9d1ba02_50    rapidsai-nightly
numpy                     1.19.5           py37haa41c4c_1    conda-forge
pandas                    1.2.3            py37hdc94413_0    conda-forge
pyarrow                   1.0.1           py37hbeecfa9_14_cuda    conda-forge
rmm                       0.19.0a210331   cuda_10.2_py37_g9d1ba02_50    rapidsai-nightly
ucx                       1.9.0+gcd9efd3       cuda10.2_0    rapidsai-nightly
ucx-proc                  1.0.0                       gpu    rapidsai-nightly
ucx-py                    0.19.0a210331   py37_gcd9efd3_46    rapidsai-nightly
@beckernick beckernick added bug Something isn't working Needs Triage Need team to review and classify labels Apr 2, 2021
@beckernick beckernick changed the title [BUG] [BUG] Groupby collect_set / unique fails with Dask Apr 2, 2021
@beckernick beckernick added Python Affects Python cuDF API. dask Dask issue and removed Needs Triage Need team to review and classify labels Apr 2, 2021
@galipremsagar galipremsagar self-assigned this Apr 2, 2021
@galipremsagar
Copy link
Contributor

So we would need some code-changes(both in dask-cudf & in upstream dask too) to generalize things related to unique groupby aggregation, but these are currently blocked by the following libcudf bug: #7611

@galipremsagar galipremsagar added the 0 - Blocked Cannot progress due to external reasons label Apr 6, 2021
@galipremsagar
Copy link
Contributor

So we would need some code-changes(both in dask-cudf & in upstream dask too) to generalize things related to unique groupby aggregation, but these are currently blocked by the following libcudf bug: #7611

cc: @mythrocks would we be able to get #7611 addressed as part of 0.20 ?

@kkraus14 kkraus14 removed the 0 - Blocked Cannot progress due to external reasons label May 24, 2021
rapids-bot bot pushed a commit that referenced this issue Jul 20, 2021
Fixes: #7840
Dependent on: dask/dask#7892

This PR introduces ability to construct `list` Series by passing in a sequence of array-like objects to `cudf.Series`.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Charles Blackmon-Luca (https://github.com/charlesbluca)
  - Ashwin Srinath (https://github.com/shwina)

URL: #8735
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working dask Dask issue Python Affects Python cuDF API.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants