Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Scatter shuffle proof-of-concept #5473

Closed
wants to merge 17 commits into from

Conversation

gjoseph92
Copy link
Collaborator

This is a semi-working proof-of-concept of a different approach to scalable(ish) shuffling. The idea is basically this: tracking millions of tasks is too much overhead on the scheduler, but what about millions of keys? Keys (static pieces of data) should be a bit easier to deal with: they don't have dependencies, they don't need to be scheduled, etc. Perhaps we can get the scheduler to track tens of millions of keys with relatively little effort?

The idea is the same as what I proposed in dask/dask#7613 (comment): tasks split up input data and scatter it, returning Futures. Other tasks take the Futures and gather them, fetching the data.

In detail

So within an input task, we:

  • split an input partition into pieces, one per output partition
  • scatter all of those shards (pieces), getting back Futures referencing them
    • there is special undocumented behavior for scatter within a task: just it takes the data, stores it in the worker's memory, and informs the scheduler that the data is there. No communication with other workers; basically lets you hand off multiple pieces of data to the system to be tracked without running a separate task to produce each piece.
  • return those Futures from the task

Then each output task depends on every one of these input tasks:

  • receives Futures for every shard (even the ones it doesn't need)
  • pull out just the Futures we need for output partition N
  • gather those Futures, transferring the data from the source worker to the current worker
  • concatenate all the shards; that's your output partition
  • (optional) cancel the Futures you just gathered so workers holding them can release that data

With this approach, we keep the graph size O(N) in the number of partitions. The number of keys is still O(N^2), but we're hoping we can handle keys much more efficiently (initial results indicate this is maybe true with some more careful thought).

There are a few reasons I find this approach appealing compared to #5435:

  1. It's bypassing the system much less. We're just combining two core distributed features: tasks and Futures.
  2. Resilience, retries, and cleanup come for free:
    • Tasks are pure and not just run for side effects; we actually use the data they produce (Futures).
    • If data (Futures) are lost from a worker dying, rerunning just the tasks that created them will correctly re-create them. And if a worker dies midway through a task, that task can just be rerun on a different worker.
    • If tasks are cancelled, the data (Futures) they depend on are released. Releasing those Futures causes the data they reference to be released (possibly on a different worker).
    • There is no out-of-band state; releasing the keys cleans up everything
  3. Spilling to disk happens automatically though the worker's standard zict buffer, since the data is owned by the worker[^1].
  4. Memory is managed; data spilled to disk shows up automatically in the dashboard.

[^1] though because disk IO blocks the event loop (#4424), performance is awful with this PR unless you disable spilling. When the loop is blocked, data can't transfer, which stops up the whole operation—the spice must flow!

However, there are also some big downsides:

  1. There's an O(N^2) component. A 100,000-partition DataFrame would require 10 billion keys for all the sub-partitions. I'm not sure we'll ever be able to handle that scale on the scheduler. In practice, I think this approach would have a lower upper bound in scalability than [DNM] Peer-to-peer shuffle design #5435.
  2. Communication and computation/disk IO can't overlap. It's a pull-based shuffle. In [Never Merge] Prototype for scalable dataframe shuffle dask#8223, we have a push-based shuffle: immediately sending parts to the appropriate place, and storing them there (in memory or on disk). Here, we store the parts where they're produced, then pull different subsets of them to the right worker at the end. This actually has a huge resilience advantage (worker failure just requires rerunning the tasks from that one worker; others are unaffected), but it may not be able to achieve the same performance.
  3. There's just a lot less room to optimize and control things than a dedicated implementation.
  4. Futures don't work quite the way we'd need for this to be performant. They communicate with the scheduler (and cause the scheduler to communicate) too often.

So how does this perform? Well, the first try was abysmal. But there was a lot of very low-hanging fruit I've either fixed or hacked past in this PR.

Currently, it's slower than a task-based shuffle for small datasets locally on my machine:
dask.datasets.timeseries(start="2000-01-01", end="2001-01-01"): 44sec shuffle="scatter, 22sec shuffle="tasks"
But it can handle bigger datasets that tasks cannot:
dask.datasets.timeseries(start="2000-01-01", end="2005-01-01"): ~5min shuffle="scatter", shuffle="tasks" failed

Yet weirdly, on a cluster, it performs much worse. I'm not sure why yet; it has something to do with when all the futures get replicated to every worker. Generally what's hurting performance is:

  • Futures informing the scheduler that a client requests keys (every worker tells the scheduler that it requests every sub-partition; this is O(N^2 * W)! But is it even necessary?)
  • Cancelling Futures and handling the transitions caused by that
  • Pickling, unpickling, msgpack

The bottom line is that this doesn't even come close to working as well as dask/dask#8223. And it's almost certainly not the most performant or scalable design for shuffling we could make.

But what I find appealing about this is that, because it's not circumventing dask, the improvements needed to make this work tend to be things we should do anyway, and that benefit the whole system, even for users who aren't shuffling. There's nothing new to add; it's just making current features as robust, performant, and correct as they should be.

Note that the point of this PR is just to record for posterity that somebody has tried this once. I do not plan to continue working on this PR or this approach currently.

Fixes dask#4959

`get_client` was calling the private `Worker._get_client` method when it ran within a task. `_get_client` should really have been called `_make_client`, since it created a new client every time. The simplest correct thing to do instead would have been to use the `Worker.client` property, which caches this instance.

In order to pass the `timeout` parameter through though, I changed `Worker.get_client` to actually match its docstring and always return the same instance.
If you accidentally pass Futures created by a different Client into `Client.gather`, you'll get a `CancelledError`. This is confusing and misleading.

An explicit check for this would have made discovering dask#5466 much easier. And since there are probably plenty of other race conditions regarding default clients in multiple threads, hopefully a better error message will save someone else time in the future too.
This is probably a good idea in general (xref dask#4959), but it particularly helps with performance deserializing Futures, which have a fastpath through `Client.current` that bypasses a number of unnecessarily slow things that `get_client` does before it checks `Client.current`.
When a key is scattered from a task and written directly to worker storage, the Client immediately sets the Future's state to `"finished"`. There's no need for the scheduler to also tell the client that that key is finished; it already knows. This saves a bit of scheduler time and a comms roundtrip.
Helps with setting the current client in worker while deserializing. Implementation referenced from python/cpython#9688
This was really slow and probably doesn't matter when the future is coming from a worker. But probably not safe to remove in general?
We don't need to report back to the client that its key was cancelled. But this shouldn't be exposed and may be wrong.
Just want to see how it affects performance
Hoping this speeds up the transfer of Futures; makes no sense in general though.
"op": "update-graph",
"tasks": {},
"keys": [stringify(self.key)],
"client": c.id,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is about updating the who_wants on scheduler side. however, I don't know for sure

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants