Skip to content

Commit

Permalink
Fix bad agg clustering spec bug
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Oct 20, 2024
1 parent 7e91e93 commit b73745a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 22 deletions.
31 changes: 25 additions & 6 deletions src/daft-plan/src/physical_plan.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::max, ops::Add, sync::Arc};
use std::{cmp::max, collections::HashSet, ops::Add, sync::Arc};

use common_display::ascii::AsciiTreeDisplay;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,12 +135,31 @@ impl PhysicalPlan {
))
.into(),
Self::ShuffleExchange(shuffle_exchange) => shuffle_exchange.clustering_spec(),
Self::Aggregate(Aggregate { input, groupby, .. }) => {
let input_clustering_spec = input.clustering_spec();
if groupby.is_empty() {
ClusteringSpec::Unknown(Default::default()).into()
Self::Aggregate(Aggregate {
input,
aggregations,
..
}) => {
// PhysicalPlan aggregates are local aggregations
//
// If the local aggregation modifies the partition columns (unlikely), the clustering spec is invalidated
//
// If the groupby keys are the partition columns (very likely, since we often partition by hash on the groupby keys), the
// clustering spec is still valid
let input_partition_by = input.clustering_spec().partition_by();
let input_partition_col_names: HashSet<&str> =
input_partition_by.iter().map(|e| e.name()).collect();
if aggregations
.iter()
.map(|e| e.name())
.any(|name| input_partition_col_names.contains(name))
{
ClusteringSpec::Unknown(UnknownClusteringConfig::new(
input.clustering_spec().num_partitions(),
))
.into()

Check warning on line 160 in src/daft-plan/src/physical_plan.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-plan/src/physical_plan.rs#L157-L160

Added lines #L157 - L160 were not covered by tests
} else {
input_clustering_spec
input.clustering_spec()
}
}
Self::Pivot(Pivot { input, .. }) => input.clustering_spec(),
Expand Down
46 changes: 30 additions & 16 deletions src/daft-scheduler/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,22 +514,36 @@ fn physical_plan_to_partition_tasks(
ShuffleExchangeStrategy::SplitOrCoalesceToTargetNum {
target_num_partitions,
} => {
if target_num_partitions >= &input_num_partitions {
let split = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "split"))?
.call1((upstream_iter, input_num_partitions, *target_num_partitions))?;
let flattened = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
.call1((split,))?;
Ok(flattened.into())
} else {
let coalesced = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "coalesce"))?
.call1((upstream_iter, input_num_partitions, *target_num_partitions))?;
Ok(coalesced.into())
match target_num_partitions.cmp(&input_num_partitions) {
std::cmp::Ordering::Equal => Ok(upstream_iter),

Check warning on line 518 in src/daft-scheduler/src/scheduler.rs

View check run for this annotation

Codecov / codecov/patch

src/daft-scheduler/src/scheduler.rs#L518

Added line #L518 was not covered by tests
std::cmp::Ordering::Greater => {
// Split if more outputs than inputs
let split = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "split"))?
.call1((
upstream_iter,
input_num_partitions,
*target_num_partitions,
))?;
let flattened = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "flatten_plan"))?
.call1((split,))?;
Ok(flattened.into())
}
std::cmp::Ordering::Less => {
// Coalesce if fewer outputs than inputs
let coalesced = py
.import_bound(pyo3::intern!(py, "daft.execution.physical_plan"))?
.getattr(pyo3::intern!(py, "coalesce"))?
.call1((
upstream_iter,
input_num_partitions,
*target_num_partitions,
))?;
Ok(coalesced.into())
}
}
}
}
Expand Down

0 comments on commit b73745a

Please sign in to comment.