Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating dask geodataframe from_dask_dataframe deadlocks #197

Closed
avriiil opened this issue Jun 13, 2022 · 3 comments · Fixed by #205
Closed

Creating dask geodataframe from_dask_dataframe deadlocks #197

avriiil opened this issue Jun 13, 2022 · 3 comments · Fixed by #205

Comments

@avriiil
Copy link

avriiil commented Jun 13, 2022

I'm running into issues when I create a dask geodataframe from a regular dask dataframe. After some debugging it looks like the .finalize() task is causing issues.

The local reproducer below shows that the finalize task is created. This is not a problem when the data fits in memory.

However, when working at scale, this finalize call causes workers to run out of memory. These workers are not killed but become unresponsive, causing a virtual 'deadlock' where the finalize task gets endlessly shipped around workers.

Local Reproducer (does not hang because data fits in memory)

import dask.dataframe as dd
from distributed import Client

client = Client(n_workers=8)

# load some data
ddf = dd.read_csv(
    "s3://nyc-tlc/csv_backup/yellow_tripdata_2012-01.csv",
)

# subset for faster iteration
ddf = ddf.partitions[0:5]

# convert to dask geodataframe
import dask_geopandas
ddf = dask_geopandas.from_dask_dataframe(
    ddf,
    geometry=dask_geopandas.points_from_xy(ddf, "pickup_longitude", "pickup_latitude"),
)

ddf.head()

Cloud-Based Reproducer (hangs because data does not fit into memory)

import dask.dataframe as dd

ddf = dd.read_parquet(
    "s3://coiled-datasets/dask-book/nyc-tlc/2009-2013/*",
    engine="pyarrow",
    storage_options={"anon": True},
)

import dask_geopandas

ddf = dask_geopandas.from_dask_dataframe(
    ddf,
    geometry=dask_geopandas.points_from_xy(ddf, "pickup_longitude", "pickup_latitude"),
)

ddf.head()

image

@gjoseph92
Copy link

@ian-r-rose and I looked at this with @rrpelgrim. The problem is that the single finalize task is taking every single partition as input, in order to concatenate them. This means trying to move 150GB to a single worker. This should definitely not happen when you do something like head (and the output of finalize should probably not be the input to head?).

The reason this causes a deadlock (as opposed to the worker running out of memory and dying) is a bit deployment-specific: dask/distributed#6110 (comment), dask/distributed#6177. On the dask-geopandas side, we should just focus on why the graph for this head operation is creating this finalize task that takes every partition as input.

@jorisvandenbossche
Copy link
Member

Thanks for the investigation! Visualizing the graph for the local reproducer, I can also see the finalize bottleneck step:
mydask

Now, the from_dask_dataframe method is under the hood a simple call to the map_partitions method from dask:

return df.map_partitions(geopandas.GeoDataFrame, geometry=geometry)

Would this be a general issue with that method?

@ian-r-rose
Copy link
Contributor

I took a closer look at this today, and the problem is indeed in Dask. In particular, we've run into this issue, where map_partitions treats positional arguments differently from keyword arguments. If a dataframe-like argument is passed in as a positional arg, the function is correctly mapped across the partitions. If it's passed as a keyword arg, it is computed and concatenated before being passed in as a non-partitioned series.

I consider this to be a bug in Dask (or at least highly surprising and undesirable behavior). Unfortunately, the fix for that would be very invasive to the internal Blockwise implementation, and probably take some time. I can, however, open a PR with a reasonable workaround in dask-geopandas. Briefly: we can avoid referring to the geometry column using a GeoSeries, and instead refer to it by name.

@rrpelgrim, in the very short-term, I have a workaround that should hopefully unblock you until we can work out a better fix:

import dask.dataframe as dd

# load some data
ddf = dd.read_csv(
    "s3://nyc-tlc/csv_backup/yellow_tripdata_2012-01.csv",
)

# subset for faster iteration
ddf = ddf.partitions[0:5]

# convert to dask geodataframe
import dask_geopandas
# Assign the geometry column using vanilla Dask
ddf = ddf.assign(geometry=dask_geopandas.points_from_xy(ddf, "pickup_longitude", "pickup_latitude"))
# Refer to the geometry column by name
ddf = dask_geopandas.from_dask_dataframe(ddf, geometry="geometry")

ddf.head()

cc @rjzamora, who might find this real-world example of the limitation in map_partitions interesting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants