Skip to content

Commit

Permalink
Feature/sort enforcement refactor (#5228)
Browse files Browse the repository at this point in the history
* Remove multilayer chain Ordering comparison for sort parallelize rule

* Update tree code

* Simplify if condition

* Update test

* Simplify sort insertion utility to avoid clones

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Feb 13, 2023
1 parent 3da7902 commit 9565887
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 101 deletions.
178 changes: 94 additions & 84 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::add_sort_above_child;
use crate::physical_optimizer::utils::add_sort_above;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -69,15 +69,28 @@ impl EnforceSorting {
/// leading to `SortExec`s.
#[derive(Debug, Clone)]
struct ExecTree {
/// The `ExecutionPlan` associated with this node
pub plan: Arc<dyn ExecutionPlan>,
/// Child index of the plan in its parent
pub idx: usize,
/// Children of the plan that would need updating if we remove leaf executors
pub children: Vec<ExecTree>,
/// The `ExecutionPlan` associated with this node
pub plan: Arc<dyn ExecutionPlan>,
}

impl ExecTree {
/// Create new Exec tree
pub fn new(
plan: Arc<dyn ExecutionPlan>,
idx: usize,
children: Vec<ExecTree>,
) -> Self {
ExecTree {
plan,
idx,
children,
}
}

/// This function returns the executors at the leaves of the tree.
fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
if self.children.is_empty() {
Expand Down Expand Up @@ -127,11 +140,7 @@ impl PlanWithCorrespondingSort {
// that maintain this ordering. If we just saw a order imposing
// operator, we reset the tree and start accumulating.
if is_sort(plan) {
return Some(ExecTree {
idx,
plan: item.plan,
children: vec![],
});
return Some(ExecTree::new(item.plan, idx, vec![]));
} else if is_limit(plan) {
// There is no sort linkage for this path, it starts at a limit.
return None;
Expand Down Expand Up @@ -159,11 +168,7 @@ impl PlanWithCorrespondingSort {
if !children.is_empty() {
// Add parent node to the tree if there is at least one
// child with a subtree:
Some(ExecTree {
idx,
plan: item.plan,
children,
})
Some(ExecTree::new(item.plan, idx, children))
} else {
// There is no sort linkage for this child, do nothing.
None
Expand Down Expand Up @@ -242,11 +247,7 @@ impl PlanWithCorrespondingCoalescePartitions {
// operator, we reset the tree and start accumulating.
let plan = item.plan;
if plan.as_any().is::<CoalescePartitionsExec>() {
Some(ExecTree {
idx,
plan,
children: vec![],
})
Some(ExecTree::new(plan, idx, vec![]))
} else if plan.children().is_empty() {
// Plan has no children, there is nothing to propagate.
None
Expand All @@ -267,11 +268,7 @@ impl PlanWithCorrespondingCoalescePartitions {
if children.is_empty() {
None
} else {
Some(ExecTree {
idx,
plan,
children,
})
Some(ExecTree::new(plan, idx, children))
}
}
})
Expand Down Expand Up @@ -365,31 +362,29 @@ fn parallelize_sorts(
let mut coalesce_onwards = requirements.coalesce_onwards;
// We know that `plan` has children, so `coalesce_onwards` is non-empty.
if coalesce_onwards[0].is_some() {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
if (is_sort(&plan) || is_sort_preserving_merge(&plan))
// Make sure that Sort is actually global sort
&& plan.output_partitioning().partition_count() == 1
{
// If there is a connection between a `CoalescePartitionsExec` and a
// `SortExec` that satisfy the requirements (i.e. they don't require a
// single partition), then we can replace the `CoalescePartitionsExec`
// + `SortExec` cascade with a `SortExec` + `SortPreservingMergeExec`
// Global Sort that satisfy the requirements (i.e. intermediate
// executors don't require single partition), then we can
// replace the `CoalescePartitionsExec`+ GlobalSort cascade with
// the `SortExec` + `SortPreservingMergeExec`
// cascade to parallelize sorting.
let mut prev_layer = plan.clone();
update_child_to_change_coalesce(
&mut prev_layer,
&mut coalesce_onwards[0],
Some(sort_exec),
)?;
let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), prev_layer);
update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?;
let sort_exprs = get_sort_exprs(&plan)?;
add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer);
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: Arc::new(spm),
coalesce_onwards: vec![None],
}));
} else if plan.as_any().is::<CoalescePartitionsExec>() {
// There is an unnecessary `CoalescePartitionExec` in the plan.
let mut prev_layer = plan.clone();
update_child_to_change_coalesce(
&mut prev_layer,
&mut coalesce_onwards[0],
None,
)?;
update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?;
let new_plan = plan.with_new_children(vec![prev_layer])?;
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: new_plan,
Expand Down Expand Up @@ -453,12 +448,8 @@ fn ensure_sorting(
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
*sort_onwards = Some(ExecTree {
idx,
plan: child.clone(),
children: vec![],
})
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when we can
Expand All @@ -474,12 +465,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
*child = add_sort_above_child(child, required.to_vec())?;
*sort_onwards = Some(ExecTree {
idx,
plan: child.clone(),
children: vec![],
})
add_sort_above(child, required.to_vec())?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
// We have a `SortExec` whose effect may be neutralized by
Expand Down Expand Up @@ -532,11 +519,11 @@ fn analyze_immediate_sort_removal(
sort_exec.expr().to_vec(),
sort_input,
));
let new_tree = ExecTree {
idx: 0,
plan: new_plan.clone(),
children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
};
let new_tree = ExecTree::new(
new_plan.clone(),
0,
sort_onwards.iter().flat_map(|e| e.clone()).collect(),
);
PlanWithCorrespondingSort {
plan: new_plan,
sort_onwards: vec![Some(new_tree)],
Expand Down Expand Up @@ -662,21 +649,19 @@ fn analyze_window_sort_removal(
}

/// Updates child to remove the unnecessary `CoalescePartitions` below it.
fn update_child_to_change_coalesce(
fn update_child_to_remove_coalesce(
child: &mut Arc<dyn ExecutionPlan>,
coalesce_onwards: &mut Option<ExecTree>,
sort_exec: Option<&SortExec>,
) -> Result<()> {
if let Some(coalesce_onwards) = coalesce_onwards {
*child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?;
*child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?;
}
Ok(())
}

/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
fn change_corresponding_coalesce_in_sub_plan(
fn remove_corresponding_coalesce_in_sub_plan(
coalesce_onwards: &mut ExecTree,
sort_exec: Option<&SortExec>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(
if coalesce_onwards
Expand All @@ -685,24 +670,12 @@ fn change_corresponding_coalesce_in_sub_plan(
.is::<CoalescePartitionsExec>()
{
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
let coalesce_input = coalesce_onwards.plan.children()[0].clone();
if let Some(sort_exec) = sort_exec {
let sort_expr = sort_exec.expr();
if !ordering_satisfy(
coalesce_input.output_ordering(),
Some(sort_expr),
|| coalesce_input.equivalence_properties(),
) {
return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
}
}
coalesce_input
coalesce_onwards.plan.children()[0].clone()
} else {
let plan = coalesce_onwards.plan.clone();
let mut children = plan.children();
for item in &mut coalesce_onwards.children {
children[item.idx] =
change_corresponding_coalesce_in_sub_plan(item, sort_exec)?;
children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item)?;
}
plan.with_new_children(children)?
},
Expand Down Expand Up @@ -783,15 +756,11 @@ fn change_finer_sort_in_sub_plan(
let plan = &sort_onwards.plan;
// A `SortExec` is always at the bottom of the tree.
if is_sort(plan) {
let prev_layer = plan.children()[0].clone();
let mut prev_layer = plan.children()[0].clone();
let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?;
*sort_onwards = ExecTree {
idx: sort_onwards.idx,
children: vec![],
plan: updated_plan.clone(),
};
Ok(updated_plan)
add_sort_above(&mut prev_layer, new_sort_expr)?;
*sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, vec![]);
Ok(prev_layer)
} else {
let mut children = plan.children();
for item in &mut sort_onwards.children {
Expand Down Expand Up @@ -1722,8 +1691,8 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
Expand Down Expand Up @@ -1780,6 +1749,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_coalesce_propagate() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let repartition = repartition_exec(source);
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition));
let repartition = repartition_exec(coalesce_partitions);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
// Add local sort
let sort = Arc::new(SortExec::new_with_partitioning(
sort_exprs.clone(),
repartition,
true,
None,
)) as _;
let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
let sort = sort_exec(sort_exprs, spm);

let physical_plan = sort.clone();
// Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort
// Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec.
let expected_input = vec![
"SortExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
Expand Down
34 changes: 17 additions & 17 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

Expand All @@ -48,22 +49,21 @@ pub fn optimize_children(
}
}

/// Util function to add SortExec above child
/// preserving the original partitioning
pub fn add_sort_above_child(
child: &Arc<dyn ExecutionPlan>,
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
) -> Result<Arc<dyn ExecutionPlan>> {
let new_child = if child.output_partitioning().partition_count() > 1 {
Arc::new(SortExec::new_with_partitioning(
sort_expr,
child.clone(),
true,
None,
)) as Arc<dyn ExecutionPlan>
} else {
Arc::new(SortExec::try_new(sort_expr, child.clone(), None)?)
as Arc<dyn ExecutionPlan>
};
Ok(new_child)
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
node.equivalence_properties()
}) {
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
SortExec::new_with_partitioning(sort_expr, node.clone(), true, None)
} else {
SortExec::try_new(sort_expr, node.clone(), None)?
}) as _
}
Ok(())
}

0 comments on commit 9565887

Please sign in to comment.