Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Do not try and preserve order when there is no order to preserve in RepartitionExec #8127

Merged
merged 11 commits into from
Nov 12, 2023
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,14 +929,12 @@ fn add_roundrobin_on_top(
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
// - Usage of order preserving variants is not desirable
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
let should_preserve_ordering = input.output_ordering().is_some();

let partitioning = Partitioning::RoundRobinBatch(n_target);
let repartition = RepartitionExec::try_new(input, partitioning)?;
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
as Arc<dyn ExecutionPlan>;
let repartition =
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();

// update distribution onward with new operator
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Ok(new_plan)
} else {
Expand Down Expand Up @@ -999,7 +997,6 @@ fn add_hash_on_top(
// requirements.
// - Usage of order preserving variants is not desirable (per the flag
// `config.optimizer.bounded_order_preserving_variants`).
let should_preserve_ordering = input.output_ordering().is_some();
let mut new_plan = if repartition_beneficial_stats {
// Since hashing benefits from partitioning, add a round-robin repartition
// before it:
Expand All @@ -1008,9 +1005,10 @@ fn add_hash_on_top(
input
};
let partitioning = Partitioning::Hash(hash_exprs, n_target);
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
new_plan =
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
// preserve any ordering if possible
.with_preserve_order();
new_plan = Arc::new(repartition) as _;

// update distribution onward with new operator
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
Expand Down Expand Up @@ -1159,11 +1157,11 @@ fn replace_order_preserving_variants_helper(
if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
if repartition.preserve_order() {
return Ok(Arc::new(
// new RepartitionExec don't preserve order
RepartitionExec::try_new(
updated_children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per @ozankabak 's suggestion, I removed the with_preserve_order(false) as well as the boolean argument which was confusing. Now with_preserve_order() is only called when the code is attempting to maintain the order

)?,
));
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,11 @@ fn remove_corresponding_sort_from_sub_plan(
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
{
Arc::new(
// By default, RepartitionExec does not preserve order
RepartitionExec::try_new(
children.swap_remove(0),
repartition.partitioning().clone(),
)?
.with_preserve_order(false),
)?,
)
} else {
plan.clone().with_new_children(children)?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ fn get_updated_plan(
// a `SortPreservingRepartitionExec` if appropriate:
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
let child = plan.children().swap_remove(0);
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?;
plan = Arc::new(repartition.with_preserve_order(true)) as _
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order();
plan = Arc::new(repartition) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionP
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand All @@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
.unwrap()
.with_preserve_order(true),
.with_preserve_order(),
)
}

Expand Down
Loading