Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dask.dataframe.read_csv('./filepath/*.csv') returning tuple #7777

Closed
evanharwin opened this issue Jun 9, 2021 · 7 comments
Closed

dask.dataframe.read_csv('./filepath/*.csv') returning tuple #7777

evanharwin opened this issue Jun 9, 2021 · 7 comments

Comments

@evanharwin
Copy link

What happened:
Loading a dataframe seemingly returned a tuple, rather than a dask.dataframe, as an exception was thrown:
AttributeError: 'tuple' object has no attribute 'sample'

What you expected to happen:
I expected for the code below to return a pandas.DataFrame with the correlations that I'm looking for!

Minimal Complete Verifiable Example:

import dask.dataframe as daskdf
from dask.distributed import Client

client = Client(memory_limit='4GB', processes=False)

raw_df = daskdf.read_csv(os.path.join(input_file_path, '*.csv'))
df = raw_df.sample(frac=0.01).drop(['gaugeid', 'time', 'input', 'labels'], 1)
correlations = df.corr().compute()

Anything else we need to know?:
The example runs fine on my local machine (Windows 10, Dask 2021.1.1, Python 3.8.5), it is just failing when run in containerised compute provided by Azure.

The full traceback is here:

Traceback (most recent call last):
  File "correlation_analysis.py", line 43, in <module>
    correlations = df.corr().compute()
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/dask/base.py", line 285, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/dask/base.py", line 567, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/client.py", line 2673, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/client.py", line 1982, in gather
    return self.sync(
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/client.py", line 853, in sync
    return sync(
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/utils.py", line 354, in sync
    raise exc.with_traceback(tb)
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/utils.py", line 337, in f
    result[0] = yield future
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/distributed/client.py", line 1847, in _gather
    raise exception.with_traceback(traceback)
  File "/azureml-envs/azureml_datastore/lib/python3.8/site-packages/dask/dataframe/methods.py", line 352, in sample
    return df.sample(random_state=rs, frac=frac, replace=replace) if len(df) > 0 else df
AttributeError: 'tuple' object has no attribute 'sample'

Environment:

  • Dask version: 2021.6.0
  • Python version: 3.8.1
  • Operating System: Linux
  • Install method (conda, pip, source): conda
@jrbourbeau
Copy link
Member

Thanks for reporting @evanharwin! Are you able to provide a minimal reproducer (see https://blog.dask.org/2018/02/28/minimal-bug-reports)? I'm able to generate an error (discussed below) with a similar code snippet, but not the same error you're seeing.

With the current main branches of dask and distributed, the following snippet

from dask.datasets import timeseries
from distributed import Client

if __name__ == "__main__":

    # This snippet runs successfully with `processes=True` (the default value)
    # but failed when `processes=False` with tasks reporting
    # TypeError('cannot unpack non-iterable Serialize object')
    client = Client(processes=False)
    df = timeseries()
    result = df.sample(frac=0.01).drop(["x", "y"], 1).corr().compute()
    print(f"{result = }")

results in tasks failing with

distributed.worker - WARNING - Compute Failed
Function:  subgraph_callable-ac5c875a-e373-4f75-befa-b213a8ee
args:      (<Serialize: ([Timestamp('2000-01-23 00:00:00', freq='D'), Timestamp('2000-01-24 00:00:00', freq='D')], 1765816192)>)
kwargs:    {}
Exception: TypeError('cannot unpack non-iterable Serialize object')

A couple of interesting things to note:

  • This is most like related to disabling low-level task fusion by default because if I set dask.config.set({"optimization.fuse.active": True}), then the snippet runs successfully
  • If we use processes=True when setting up the Client (the default value for processes=) then the snippet runs successfully

cc @rjzamora for visibility

@rjzamora
Copy link
Member

rjzamora commented Jun 9, 2021

Hmm - This looks like a familar problem @madsbk and I were running into. I will investigate, but I suspect the culprit is that using processes=False means we do not actually pass through the comm layer, and so Serialize objects are not actually converted into Serialized objects.

@madsbk
Copy link
Contributor

madsbk commented Jun 10, 2021

FYI: dask/distributed#4897 should fix this issue however it might take some time before the PR is merged.

@evanharwin
Copy link
Author

Thanks all for your input. I solved my issue by calculating the correlations on individual partitions of my dataset using pandas and dask.bag. Then I can aggregate these further down the line in my algorithm without too much issue.

However, I'll keep an eye on that PR and transition to whole dataset correlations when it is merged.

@umonaca
Copy link

umonaca commented Aug 30, 2021

I can confirm this bug exists and I solved it by just removing the processes=False option. So I think the observation above is probably correct.
Dask version: 2021.8.0

@snowoody
Copy link

I run into a similar issue with dask.dataframe.read_csv().compute() returning a tuple instead of a pandas dataframe.

Set dask.config.set({"optimization.fuse.active": True}) in the code or set processes=True when starting the Client both can solve the problem.
Dask version: 2021.12.0
pandas version: 1.3.5
Python version: 3.7

@rjzamora
Copy link
Member

I'd like to close this issue in favor of #8581 - Although this issue has useful discussion, that bug report is bit more focused on the underlying serilaization issue (and the likely fix in distributed).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants