Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Refactor shuffles to use a unified ShuffleExchange PhysicalPlan variant #3083

Merged
merged 8 commits into from
Oct 28, 2024

Conversation

jaychia
Copy link
Contributor

@jaychia jaychia commented Oct 19, 2024

Summary

Refactors our PhysicalPlan by adding a new PhysicalPlan::ShuffleExchange variant.

Why?

This PR makes it easier to add new shuffle implementations, by simply extending the PhysicalPlan::ShuffleExchange enum

  1. To users, the physical plan looks a lot more legible (even I don't really know what a "Flatten" is...)
  2. Easier to add new types of shuffles... Also, there's a nice place to add a layer of heuristics to figure out when to use the new shuffles (the new ShuffleExchangeBuilder builder can let us direct clients to use more complex shuffles by consulting environment variables, available cluster resources, number of partitions etc)

Elaboration

Our previous PhysicalPlan was a bit of a leaky abstraction, expressing 2 types of shuffles by invoking an unnecessarily low-level chain of operations to users of the abstraction, as well as users of Daft:

  1. FanoutHash/Range/Random -> Flatten -> ReduceMerge: this pattern was used to express a "NaiveFullyMaterializingMapReduce" shuffle (materialize all Map tasks, perform a "flatten" which makes no sense to users, then reduce and merge)
  2. Split/Coalesce -> Flatten: this pattern was used to express a "SplitOrCoalesceToTargetNum" shuffle

This leaky abstraction makes it difficult to add new types of shuffles (e.g. the push-based shuffle implemented in #2883) as it involves adding new PhysicalPlan variant(s). Additionally, all of these shuffles share similar characteristics during plan optimization, and the actual implementation of "how" to execute these shuffles should be highly dependent on factors such as available cluster resources, expected complexity of the shuffle and more.

@github-actions github-actions bot added the chore label Oct 19, 2024
Copy link

codspeed-hq bot commented Oct 20, 2024

CodSpeed Performance Report

Merging #3083 will not alter performance

Comparing jay/refactor-shuffles (c2838a5) with main (a112523)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Oct 20, 2024

Codecov Report

Attention: Patch coverage is 77.09924% with 60 lines in your changes missing coverage. Please review.

Project coverage is 78.77%. Comparing base (a112523) to head (d36dd0c).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-plan/src/physical_ops/shuffle_exchange.rs 52.17% 44 Missing ⚠️
src/daft-plan/src/physical_plan.rs 75.00% 7 Missing ⚠️
src/daft-plan/src/physical_planner/translate.rs 92.98% 4 Missing ⚠️
src/daft-scheduler/src/scheduler.rs 94.82% 3 Missing ⚠️
src/daft-plan/src/display.rs 0.00% 1 Missing ⚠️
src/daft-plan/src/physical_planner/planner.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3083      +/-   ##
==========================================
+ Coverage   78.73%   78.77%   +0.03%     
==========================================
  Files         623      619       -4     
  Lines       74710    74621      -89     
==========================================
- Hits        58826    58784      -42     
+ Misses      15884    15837      -47     
Files with missing lines Coverage Δ
daft/execution/rust_physical_plan_shim.py 94.56% <100.00%> (ø)
src/daft-plan/src/partitioning.rs 47.42% <100.00%> (-0.16%) ⬇️
src/daft-plan/src/physical_ops/mod.rs 0.00% <ø> (ø)
...rc/physical_optimization/rules/drop_repartition.rs 100.00% <100.00%> (ø)
...sical_optimization/rules/reorder_partition_keys.rs 79.44% <100.00%> (-0.78%) ⬇️
src/daft-plan/src/display.rs 82.64% <0.00%> (+1.99%) ⬆️
src/daft-plan/src/physical_planner/planner.rs 3.38% <0.00%> (ø)
src/daft-scheduler/src/scheduler.rs 93.13% <94.82%> (-0.03%) ⬇️
src/daft-plan/src/physical_planner/translate.rs 95.84% <92.98%> (+0.58%) ⬆️
src/daft-plan/src/physical_plan.rs 59.30% <75.00%> (+1.51%) ⬆️
... and 1 more

... and 5 files with indirect coverage changes

Self::Aggregate(Aggregate { input, groupby, .. }) => {
let input_clustering_spec = input.clustering_spec();
if groupby.is_empty() {
ClusteringSpec::Unknown(Default::default()).into()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: The previous implementation seems to be a bug which was masked somehow by the previous implementation of FanoutMap -> ReduceMerge. I implemented a fix which I think is appropriate.

@jaychia jaychia removed the request for review from kevinzwang October 21, 2024 20:28
@jaychia jaychia force-pushed the jay/refactor-shuffles branch from b73745a to 5ea4356 Compare October 21, 2024 23:58
Copy link
Contributor

@colin-ho colin-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it!

///
/// This provides an abstraction where we can select the most appropriate strategies based on various
/// heuristics such as number of partitions and the currently targeted backend's available resources.
pub struct ShuffleExchangeBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if Builder is the most appropriate pattern for this, given that:

  1. Each ShuffleExchange configuration is distinct.
  2. There's no incremental steps involved in building a ShuffleExchange

I think you could instead have distinct creation methods on the struct impl itself:

impl ShuffleExchange {
    pub fn hash_partitioned(
        input: PhysicalPlanRef,
        by: Vec<ExprRef>,
        num_partitions: usize,
    ) -> Self {
        ...
    }

    pub fn range_partitioned(
        input: PhysicalPlanRef,
        by: Vec<ExprRef>,
        descending: Vec<bool>,
        num_partitions: usize,
    ) -> Self {
        ...
    }
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good callout. I changed this to be a Factory instead of adding a bunch of ::new methods directly on ShuffleExchange, because I do foresee that we will want the factory to be configurable.

E.g. the factory should be parametrized by config variables as well as other things such as statistics/estimations on the number of partitions. Then when you create the shuffle exchange, it will dynamically choose the correct ShuffleExchange implementation based on the data available to it.

@@ -262,6 +262,8 @@ fn physical_plan_to_partition_tasks(
psets: &HashMap<String, Vec<PyObject>>,
actor_pool_manager: &PyObject,
) -> PyResult<PyObject> {
use daft_plan::physical_ops::{ShuffleExchange, ShuffleExchangeStrategy};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to lump this with the other imports at the top?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only because the stuff in here is #[cfg(feature = "python")]. Not crucial though,

target_num_partitions,
} => {
match target_num_partitions.cmp(&input_num_partitions) {
std::cmp::Ordering::Equal => Ok(upstream_iter),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be unreachable? I believe these should have been dropped by drop repartition rules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes, but I thought that having a branch here wouldn't hurt just in case something weird happened upstream.

py,
"daft.execution.rust_physical_plan_shim"
))?
.getattr(pyo3::intern!(py, "split_by_hash"))?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we rename it to fanout_by_hash?

@jaychia jaychia force-pushed the jay/refactor-shuffles branch from bd3dd9b to bf7e0e4 Compare October 28, 2024 23:28
@jaychia jaychia enabled auto-merge (squash) October 28, 2024 23:38
@jaychia jaychia merged commit b0a5a40 into main Oct 28, 2024
40 checks passed
@jaychia jaychia deleted the jay/refactor-shuffles branch October 28, 2024 23:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants