From 95658877dc8693a82d5046f64a5219595b64f9e3 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <106137913+mustafasrepo@users.noreply.github.com> Date: Mon, 13 Feb 2023 22:02:52 +0300 Subject: [PATCH] Feature/sort enforcement refactor (#5228) * Remove multilayer chain Ordering comparison for sort parallelize rule * Update tree code * Simplify if condition * Update test * Simplify sort insertion utility to avoid clones --------- Co-authored-by: Mehmet Ozan Kabak --- .../physical_optimizer/sort_enforcement.rs | 178 +++++++++--------- .../core/src/physical_optimizer/utils.rs | 34 ++-- 2 files changed, 111 insertions(+), 101 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs b/datafusion/core/src/physical_optimizer/sort_enforcement.rs index a59f35fc1d76..8dfa4199b5a6 100644 --- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs +++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs @@ -35,7 +35,7 @@ //! by another SortExec. Therefore, this rule removes it from the physical plan. use crate::config::ConfigOptions; use crate::error::Result; -use crate::physical_optimizer::utils::add_sort_above_child; +use crate::physical_optimizer::utils::add_sort_above; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; @@ -69,15 +69,28 @@ impl EnforceSorting { /// leading to `SortExec`s. #[derive(Debug, Clone)] struct ExecTree { + /// The `ExecutionPlan` associated with this node + pub plan: Arc, /// Child index of the plan in its parent pub idx: usize, /// Children of the plan that would need updating if we remove leaf executors pub children: Vec, - /// The `ExecutionPlan` associated with this node - pub plan: Arc, } impl ExecTree { + /// Create new Exec tree + pub fn new( + plan: Arc, + idx: usize, + children: Vec, + ) -> Self { + ExecTree { + plan, + idx, + children, + } + } + /// This function returns the executors at the leaves of the tree. fn get_leaves(&self) -> Vec> { if self.children.is_empty() { @@ -127,11 +140,7 @@ impl PlanWithCorrespondingSort { // that maintain this ordering. If we just saw a order imposing // operator, we reset the tree and start accumulating. if is_sort(plan) { - return Some(ExecTree { - idx, - plan: item.plan, - children: vec![], - }); + return Some(ExecTree::new(item.plan, idx, vec![])); } else if is_limit(plan) { // There is no sort linkage for this path, it starts at a limit. return None; @@ -159,11 +168,7 @@ impl PlanWithCorrespondingSort { if !children.is_empty() { // Add parent node to the tree if there is at least one // child with a subtree: - Some(ExecTree { - idx, - plan: item.plan, - children, - }) + Some(ExecTree::new(item.plan, idx, children)) } else { // There is no sort linkage for this child, do nothing. None @@ -242,11 +247,7 @@ impl PlanWithCorrespondingCoalescePartitions { // operator, we reset the tree and start accumulating. let plan = item.plan; if plan.as_any().is::() { - Some(ExecTree { - idx, - plan, - children: vec![], - }) + Some(ExecTree::new(plan, idx, vec![])) } else if plan.children().is_empty() { // Plan has no children, there is nothing to propagate. None @@ -267,11 +268,7 @@ impl PlanWithCorrespondingCoalescePartitions { if children.is_empty() { None } else { - Some(ExecTree { - idx, - plan, - children, - }) + Some(ExecTree::new(plan, idx, children)) } } }) @@ -365,19 +362,21 @@ fn parallelize_sorts( let mut coalesce_onwards = requirements.coalesce_onwards; // We know that `plan` has children, so `coalesce_onwards` is non-empty. if coalesce_onwards[0].is_some() { - if let Some(sort_exec) = plan.as_any().downcast_ref::() { + if (is_sort(&plan) || is_sort_preserving_merge(&plan)) + // Make sure that Sort is actually global sort + && plan.output_partitioning().partition_count() == 1 + { // If there is a connection between a `CoalescePartitionsExec` and a - // `SortExec` that satisfy the requirements (i.e. they don't require a - // single partition), then we can replace the `CoalescePartitionsExec` - // + `SortExec` cascade with a `SortExec` + `SortPreservingMergeExec` + // Global Sort that satisfy the requirements (i.e. intermediate + // executors don't require single partition), then we can + // replace the `CoalescePartitionsExec`+ GlobalSort cascade with + // the `SortExec` + `SortPreservingMergeExec` // cascade to parallelize sorting. let mut prev_layer = plan.clone(); - update_child_to_change_coalesce( - &mut prev_layer, - &mut coalesce_onwards[0], - Some(sort_exec), - )?; - let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), prev_layer); + update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; + let sort_exprs = get_sort_exprs(&plan)?; + add_sort_above(&mut prev_layer, sort_exprs.to_vec())?; + let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer); return Ok(Some(PlanWithCorrespondingCoalescePartitions { plan: Arc::new(spm), coalesce_onwards: vec![None], @@ -385,11 +384,7 @@ fn parallelize_sorts( } else if plan.as_any().is::() { // There is an unnecessary `CoalescePartitionExec` in the plan. let mut prev_layer = plan.clone(); - update_child_to_change_coalesce( - &mut prev_layer, - &mut coalesce_onwards[0], - None, - )?; + update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; let new_plan = plan.with_new_children(vec![prev_layer])?; return Ok(Some(PlanWithCorrespondingCoalescePartitions { plan: new_plan, @@ -453,12 +448,8 @@ fn ensure_sorting( // Make sure we preserve the ordering requirements: update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; let sort_expr = required_ordering.to_vec(); - *child = add_sort_above_child(child, sort_expr)?; - *sort_onwards = Some(ExecTree { - idx, - plan: child.clone(), - children: vec![], - }) + add_sort_above(child, sort_expr)?; + *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); } if let Some(tree) = sort_onwards { // For window expressions, we can remove some sorts when we can @@ -474,12 +465,8 @@ fn ensure_sorting( } (Some(required), None) => { // Ordering requirement is not met, we should add a `SortExec` to the plan. - *child = add_sort_above_child(child, required.to_vec())?; - *sort_onwards = Some(ExecTree { - idx, - plan: child.clone(), - children: vec![], - }) + add_sort_above(child, required.to_vec())?; + *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); } (None, Some(_)) => { // We have a `SortExec` whose effect may be neutralized by @@ -532,11 +519,11 @@ fn analyze_immediate_sort_removal( sort_exec.expr().to_vec(), sort_input, )); - let new_tree = ExecTree { - idx: 0, - plan: new_plan.clone(), - children: sort_onwards.iter().flat_map(|e| e.clone()).collect(), - }; + let new_tree = ExecTree::new( + new_plan.clone(), + 0, + sort_onwards.iter().flat_map(|e| e.clone()).collect(), + ); PlanWithCorrespondingSort { plan: new_plan, sort_onwards: vec![Some(new_tree)], @@ -662,21 +649,19 @@ fn analyze_window_sort_removal( } /// Updates child to remove the unnecessary `CoalescePartitions` below it. -fn update_child_to_change_coalesce( +fn update_child_to_remove_coalesce( child: &mut Arc, coalesce_onwards: &mut Option, - sort_exec: Option<&SortExec>, ) -> Result<()> { if let Some(coalesce_onwards) = coalesce_onwards { - *child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?; + *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?; } Ok(()) } /// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`. -fn change_corresponding_coalesce_in_sub_plan( +fn remove_corresponding_coalesce_in_sub_plan( coalesce_onwards: &mut ExecTree, - sort_exec: Option<&SortExec>, ) -> Result> { Ok( if coalesce_onwards @@ -685,24 +670,12 @@ fn change_corresponding_coalesce_in_sub_plan( .is::() { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. - let coalesce_input = coalesce_onwards.plan.children()[0].clone(); - if let Some(sort_exec) = sort_exec { - let sort_expr = sort_exec.expr(); - if !ordering_satisfy( - coalesce_input.output_ordering(), - Some(sort_expr), - || coalesce_input.equivalence_properties(), - ) { - return add_sort_above_child(&coalesce_input, sort_expr.to_vec()); - } - } - coalesce_input + coalesce_onwards.plan.children()[0].clone() } else { let plan = coalesce_onwards.plan.clone(); let mut children = plan.children(); for item in &mut coalesce_onwards.children { - children[item.idx] = - change_corresponding_coalesce_in_sub_plan(item, sort_exec)?; + children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item)?; } plan.with_new_children(children)? }, @@ -783,15 +756,11 @@ fn change_finer_sort_in_sub_plan( let plan = &sort_onwards.plan; // A `SortExec` is always at the bottom of the tree. if is_sort(plan) { - let prev_layer = plan.children()[0].clone(); + let mut prev_layer = plan.children()[0].clone(); let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec(); - let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?; - *sort_onwards = ExecTree { - idx: sort_onwards.idx, - children: vec![], - plan: updated_plan.clone(), - }; - Ok(updated_plan) + add_sort_above(&mut prev_layer, new_sort_expr)?; + *sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, vec![]); + Ok(prev_layer) } else { let mut children = plan.children(); for item in &mut sort_onwards.children { @@ -1722,8 +1691,8 @@ mod tests { ]; let expected_optimized = vec![ "SortPreservingMergeExec: [nullable_col@0 ASC]", - " FilterExec: NOT non_nullable_col@1", - " SortExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " FilterExec: NOT non_nullable_col@1", " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]", ]; @@ -1780,6 +1749,47 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_coalesce_propagate() -> Result<()> { + let schema = create_test_schema()?; + let source = memory_exec(&schema); + let repartition = repartition_exec(source); + let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition)); + let repartition = repartition_exec(coalesce_partitions); + let sort_exprs = vec![sort_expr("nullable_col", &schema)]; + // Add local sort + let sort = Arc::new(SortExec::new_with_partitioning( + sort_exprs.clone(), + repartition, + true, + None, + )) as _; + let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort); + let sort = sort_exec(sort_exprs, spm); + + let physical_plan = sort.clone(); + // Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort + // Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec. + let expected_input = vec![ + "SortExec: [nullable_col@0 ASC]", + " SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + " CoalescePartitionsExec", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + let expected_optimized = vec![ + "SortPreservingMergeExec: [nullable_col@0 ASC]", + " SortExec: [nullable_col@0 ASC]", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10", + " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0", + " MemoryExec: partitions=0, partition_sizes=[]", + ]; + assert_optimized!(expected_input, expected_optimized, physical_plan); + Ok(()) + } + /// make PhysicalSortExpr with default options fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr { sort_expr_options(name, schema, SortOptions::default()) diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index 13e04bbc2ae8..b6666fbefae1 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -23,6 +23,7 @@ use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; +use datafusion_physical_expr::utils::ordering_satisfy; use datafusion_physical_expr::PhysicalSortExpr; use std::sync::Arc; @@ -48,22 +49,21 @@ pub fn optimize_children( } } -/// Util function to add SortExec above child -/// preserving the original partitioning -pub fn add_sort_above_child( - child: &Arc, +/// This utility function adds a `SortExec` above an operator according to the +/// given ordering requirements while preserving the original partitioning. +pub fn add_sort_above( + node: &mut Arc, sort_expr: Vec, -) -> Result> { - let new_child = if child.output_partitioning().partition_count() > 1 { - Arc::new(SortExec::new_with_partitioning( - sort_expr, - child.clone(), - true, - None, - )) as Arc - } else { - Arc::new(SortExec::try_new(sort_expr, child.clone(), None)?) - as Arc - }; - Ok(new_child) +) -> Result<()> { + // If the ordering requirement is already satisfied, do not add a sort. + if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || { + node.equivalence_properties() + }) { + *node = Arc::new(if node.output_partitioning().partition_count() > 1 { + SortExec::new_with_partitioning(sort_expr, node.clone(), true, None) + } else { + SortExec::try_new(sort_expr, node.clone(), None)? + }) as _ + } + Ok(()) }