Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MINOR]: Simplify code, change requirement from PhysicalSortExpr to PhysicalSortRequirement #7913

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ use datafusion_physical_expr::expressions::{Column, NoOp};
use datafusion_physical_expr::utils::{
map_columns_before_projection, ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{
expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
};
use datafusion_physical_expr::{expr_list_eq_strict_order, PhysicalExpr};
use datafusion_physical_plan::unbounded_output;
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};

Expand Down Expand Up @@ -1374,10 +1372,7 @@ fn ensure_distribution(
// make sure ordering requirements are still satisfied after.
if ordering_satisfied {
// Make sure to satisfy ordering requirement:
let sort_expr = PhysicalSortRequirement::to_sort_exprs(
required_input_ordering.clone(),
);
add_sort_above(&mut child, sort_expr, None)?;
add_sort_above(&mut child, required_input_ordering, None);
}
}
// Stop tracking distribution changing operators
Expand Down Expand Up @@ -1715,7 +1710,7 @@ mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, expressions::Column,
LexOrdering, PhysicalExpr, PhysicalSortExpr,
LexOrdering, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
};

/// Models operators like BoundedWindowExec that require an input
Expand Down
16 changes: 8 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,11 @@ fn parallelize_sorts(
let mut prev_layer = plan.clone();
update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?;
let (sort_exprs, fetch) = get_sort_exprs(&plan)?;
add_sort_above(&mut prev_layer, sort_exprs.to_vec(), fetch)?;
add_sort_above(
&mut prev_layer,
&PhysicalSortRequirement::from_sort_exprs(sort_exprs),
fetch,
);
let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer)
.with_fetch(fetch);
return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions {
Expand Down Expand Up @@ -456,9 +460,7 @@ fn ensure_sorting(
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr =
PhysicalSortRequirement::to_sort_exprs(required_ordering);
add_sort_above(child, sort_expr, None)?;
add_sort_above(child, &required_ordering, None);
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
} else {
Expand All @@ -468,8 +470,7 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
let sort_expr = PhysicalSortRequirement::to_sort_exprs(required);
add_sort_above(child, sort_expr, None)?;
add_sort_above(child, &required, None);
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
Expand Down Expand Up @@ -603,9 +604,8 @@ fn analyze_window_sort_removal(
.required_input_ordering()
.swap_remove(0)
.unwrap_or_default();
let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
// Satisfy the ordering requirement so that the window can run:
add_sort_above(&mut window_child, sort_expr, None)?;
add_sort_above(&mut window_child, &reqs, None);

let uses_bounded_memory = window_expr.iter().all(|e| e.uses_bounded_memory());
let new_window = if uses_bounded_memory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ impl OrderPreservationContext {
// ordering, (or that can maintain ordering with the replacement of
// its variant)
let plan = item.plan;
let children = plan.children();
let ordering_onwards = item.ordering_onwards;
if plan.children().is_empty() {
if children.is_empty() {
// Plan has no children, there is nothing to propagate.
None
} else if ordering_onwards[0].is_none()
&& ((is_repartition(&plan) && !plan.maintains_input_order()[0])
|| (is_coalesce_partitions(&plan)
&& plan.children()[0].output_ordering().is_some()))
&& children[0].output_ordering().is_some()))
{
Some(ExecTree::new(plan, idx, vec![]))
} else {
Expand Down Expand Up @@ -175,19 +176,18 @@ fn get_updated_plan(
// When a `RepartitionExec` doesn't preserve ordering, replace it with
// a `SortPreservingRepartitionExec` if appropriate:
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
let child = plan.children()[0].clone();
plan = Arc::new(
RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order(true),
) as _
let child = plan.children().swap_remove(0);
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?;
plan = Arc::new(repartition.with_preserve_order(true)) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
// with a `SortPreservingMergeExec` if appropriate:
let mut children = plan.children();
if is_coalesce_partitions(&plan)
&& plan.children()[0].output_ordering().is_some()
&& children[0].output_ordering().is_some()
&& is_spm_better
{
let child = plan.children()[0].clone();
let child = children.swap_remove(0);
plan = Arc::new(SortPreservingMergeExec::new(
child.output_ordering().unwrap_or(&[]).to_vec(),
child,
Expand Down
31 changes: 14 additions & 17 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,21 @@ pub(crate) fn pushdown_sorts(
|| plan.ordering_equivalence_properties(),
) {
// If the current plan is a SortExec, modify it to satisfy parent requirements:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let parent_required_expr = parent_required.ok_or_else(err)?;
new_plan = sort_exec.input().clone();
add_sort_above(&mut new_plan, parent_required_expr, sort_exec.fetch())?;
add_sort_above(&mut new_plan, parent_required_expr, sort_exec.fetch());
};
let required_ordering = new_plan
.output_ordering()
.map(PhysicalSortRequirement::from_sort_exprs);
// Since new_plan is a SortExec, we can safely get the 0th index.
let child = &new_plan.children()[0];
let child = new_plan.children().swap_remove(0);
if let Some(adjusted) =
pushdown_requirement_to_children(child, required_ordering.as_deref())?
pushdown_requirement_to_children(&child, required_ordering.as_deref())?
{
// Can push down requirements
Ok(Transformed::Yes(SortPushDown {
plan: child.clone(),
plan: child,
required_ordering: None,
adjusted_request_ordering: adjusted,
}))
Expand All @@ -180,17 +178,15 @@ pub(crate) fn pushdown_sorts(
// Can not satisfy the parent requirements, check whether the requirements can be pushed down:
if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? {
Ok(Transformed::Yes(SortPushDown {
plan: plan.clone(),
plan: requirements.plan,
required_ordering: None,
adjusted_request_ordering: adjusted,
}))
} else {
// Can not push down requirements, add new SortExec:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let mut new_plan = plan.clone();
add_sort_above(&mut new_plan, parent_required_expr, None)?;
let parent_required_expr = parent_required.ok_or_else(err)?;
let mut new_plan = requirements.plan;
add_sort_above(&mut new_plan, parent_required_expr, None);
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
}
}
Expand All @@ -206,7 +202,7 @@ fn pushdown_requirement_to_children(
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref();
let child_plan = plan.children()[0].clone();
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
Ok(Some(vec![request_child.map(|r| r.to_vec())]))
Expand Down Expand Up @@ -355,16 +351,17 @@ fn try_pushdown_requirements_to_join(
|| smj.ordering_equivalence_properties(),
)
.then(|| {
let required_input_ordering = smj.required_input_ordering();
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(&sort_expr));
match push_side {
JoinSide::Left => {
vec![new_req, required_input_ordering[1].clone()]
required_input_ordering[0] = new_req;
}
JoinSide::Right => {
vec![required_input_ordering[0].clone(), new_req]
required_input_ordering[1] = new_req;
}
}
required_input_ordering
}))
}

Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::error::Result;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
Expand All @@ -31,8 +30,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{displayable, ExecutionPlan};

use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_expr::utils::ordering_satisfy_requirement;
use datafusion_physical_expr::PhysicalSortRequirement;

/// This object implements a tree that we use while keeping track of paths
/// leading to [`SortExec`]s.
Expand Down Expand Up @@ -101,16 +100,17 @@ pub(crate) fn get_children_exectrees(
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
sort_requirement: &[PhysicalSortRequirement],
fetch: Option<usize>,
) -> Result<()> {
) {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(
if !ordering_satisfy_requirement(
node.output_ordering(),
Some(&sort_expr),
Some(sort_requirement),
|| node.equivalence_properties(),
|| node.ordering_equivalence_properties(),
) {
let sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec());
let new_sort = SortExec::new(sort_expr, node.clone()).with_fetch(fetch);

*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
Expand All @@ -119,7 +119,6 @@ pub fn add_sort_above(
new_sort
}) as _
}
Ok(())
}

/// Checks whether the given operator is a limit;
Expand Down