-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QST]: p2p shuffle on large datasets #7380
Comments
there is a "hidden dashboard" Otherwise, a peak at the very end could mean this is the |
The key/partition distribution can be calculated with ddf = ddf.shuffle('key', shuffle="p2p")
len_per_partition = ddf.map_partitions(len).compute() which just counts the length of every partition. If you can run this without hitting an OOM, it's likely parquet that is causing you trouble Lastly, in which "stage" are the workers dropping? Still during transfer (keys: |
There is one key that is much more prevalent than others, so yes, this could be a cause. Am seeing if the shuffle + map_partitions completes. |
Actually, it looks like I don't get as far as the shuffle proper at all, a worker gets killed when the If I run with With |
Eventually that approach died in the Log
|
The actual exception above is a bit hidden
|
My guess is that this is due to a single large partition which dask is trying to materialise on a single worker and blowing out the memory budget. Since I know this, I can manually split that partition and rather than calling |
Maybe that's indeed due to a memory error but we would expect a different exception. The exception you got is something that should not be possible with #7326 Can you tell us which commit you were using? |
Here's an updated log, same kind of failure, but slightly different traceback. Log
|
The dashboard errors are more less expected. WE haven't fixed the dashboard in case a worker is dying. A worker is definitely dying during the shuffle. We haven't implemented a restart logic for it, yet. See #7353 |
The entire dataset doesn't need to fit into memory. A single partition has to fit, of course (plus a little room in case pandas/arrow decides to copy stuff). If you are failing already at a task that is before the actual shuffle, you might have a partition that is too large. You can also run a sanity check to ensure there is nothing super weird going on, e.g.
This loads the entire data once and returns the row count. If this fails already, your machine is too small, or rather your workers are too small
... what I'm trying to say. Maybe there is the one bad file that expands to 20-30GB. IDK |
I assume this is not a public dataset, is it? |
Unfortunately not, AFAIK. |
I know the input files all expand to sane sizes (have tested this) [updated initial comment, it's always in the 2-4GiB range], so parquet-wise the inputs are sane. My reading of the errors, and perhaps you disagree, is that an (or maybe a few) output partitions are large (to the point that one output partition doesn't fit in a single worker's memory). I will try with fewer workers to see if upping the individual memory limit helps. |
Possibly. One important thing to note is that at the moment the entire dataset has to fit onto disk but not in memory. Maybe your disk size is too small? I am surprised you do not see a better exception. We might have been a bit too zealous with stripping "irrelevant" traces in #7326 |
It sure looks like it. The way that I read these logs, a worker gets restarted because it is running out of memory, which causes the scheduler extension to fail the shuffle. This in turn triggers a cascade of errors in the |
I tried overcommitting memory in the workers and eventually we nearly completed, but I think one worker (which needed to read 49GiB from disk) was removed because it didn't heartbeat for 5 mins:
Remind me how I up that limit, I think I should then manage to get this to complete. |
I'm curious, would you be comfortable creating a mimic if this functionality existed?: dask/dask#9766 |
In principle I have no objection to doing this, but would need to read the EULA on the data carefully to know exactly what that would need to cover. In this particular instance, I have a hunch about what is causing things to fall over, and so I think I can by hand construct a synthetic dataset that will provoke the issue. |
To follow up here, I was able to get the following script to run to completion: This was on a machine with 40 physical cores and 1TB of RAM. I needed to set:
(Probably they didn't need to be that high, but belt-and-braces) I also need to overcommit the memory limit for each worker to 100GiB. The reason for this, and the previous failures, is that this dataset has a very skewed distribution for the shuffle key. In particular, there is a single key value that corresponds to around 5% of the total rows (this leads to one worker peaking at 80GiB memory usage when performing the The dataset has 2879987999 total rows, and the largest output partition has 132603535 rows. In this particular instance, I know that downstream I don't need to do a merge of the dataset on this key (it's just a pre-sorting step), and so with the prior of the skewed key distribution I could write code to manually construct a better partitioning key. I wonder to what extent that might be automated. One could imagine extending the interface to allow the user to provide a prior on the key distribution that allows the shuffling mechanism to make sensible decisions. In any case, having figured out the issues, I can, if it is interesting, construct a synthetic datasets that would allow you to test things too (I think one can also replicate the problem at a smaller scale by just doing the same thing but having tighter worker limits). from pathlib import Path
import dask
import dask.dataframe as dd
import distributed
from distributed import Client, LocalCluster
if __name__ == "__main__":
print("Dask version:", dask.__version__)
print("Distributed version:", distributed.__version__)
cluster = LocalCluster(n_workers=20, memory_limit="100GiB")
client = Client(cluster)
inputdir = Path(".../input/")
outputdir = Path(".../shuffled/")
ddf = dd.read_parquet(inputdir, split_row_groups=True)
ddf = ddf.shuffle('shuffle_key', shuffle="p2p")
final_partition_sizes = ddf.map_partitions(len).compute()
print(f"Num out partitions = {len(final_partition_sizes)}")
print(final_partition_sizes.max(), final_partition_sizes.min())
print(final_partition_sizes) Complete log (not fully error/warning-free)
|
I'm attempting to use to p2p shuffle implementation (using the branch proposed for merge in #7326) to shuffle an ~1TB dataset.
The data exists on disk as ~300 parquet files (that each expand to around [edit 2GiB] in size, with 23 columns) and I'm trying to shuffle into around 300 output partitions and writing to parquet. The key column is a string (although I can convert to int or datetime if that would help), the other columns are a mix of string, int, and float.
This is on a machine with 1TB RAM, and 40 cores. I run like so:
This progresses quite well for a while, with peak memory usage hitting ~600GB, at some point though, some workers reach 95% their memory limits and are then killed by the nanny.
Am I configuring things wrong? Do I need to switch on anything else? Or should I not be expecting this to work right now?
The text was updated successfully, but these errors were encountered: