From 0a42315520940946cb2a80483439d4dcd46a7a10 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Tue, 13 Dec 2022 17:15:48 +0300 Subject: [PATCH] simplifications --- .../remove_unnecessary_sorts.rs | 86 ++++++++----------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs b/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs index 9c2e071dd8b8..322677a30428 100644 --- a/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs +++ b/datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs @@ -88,39 +88,7 @@ fn remove_unnecessary_sorts( ordering_satisfy_inner(physical_ordering, required_ordering, || { child.equivalence_properties() }); - if is_ordering_satisfied { - // can do analysis for sort removal - if !sort_onward.is_empty() { - let (_, sort_any) = sort_onward[0].clone(); - let sort_exec = sort_any - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Plan( - "First layer should start from SortExec".to_string(), - ) - })?; - let sort_output_ordering = sort_exec.output_ordering(); - let sort_input_ordering = sort_exec.input().output_ordering(); - // Do naive analysis, where a SortExec is already sorted according to desired Sorting - if ordering_satisfy( - sort_input_ordering, - sort_output_ordering, - || sort_exec.input().equivalence_properties(), - ) { - update_child_to_remove_unnecessary_sort(child, sort_onward)?; - } else if let Some(window_agg_exec) = - requirements.plan.as_any().downcast_ref::() - { - // For window expressions we can remove some Sorts when expression can be calculated in reverse order also. - if let Some(res) = - analyze_window_sort_removal(window_agg_exec, sort_onward)? - { - return Ok(Some(res)); - } - } - } - } else { + if !is_ordering_satisfied { // During sort Removal we have invalidated ordering invariant fix it // This is effectively moving sort above in the physical plan update_child_to_remove_unnecessary_sort(child, sort_onward)?; @@ -128,6 +96,29 @@ fn remove_unnecessary_sorts( *child = add_sort_above_child(child, sort_expr)?; // Since we have added Sort, we add it to the sort_onwards also. sort_onward.push((idx, child.clone())) + } else if is_ordering_satisfied && !sort_onward.is_empty() { + // can do analysis for sort removal + let (_, sort_any) = sort_onward[0].clone(); + let sort_exec = convert_to_sort_exec(&sort_any)?; + let sort_output_ordering = sort_exec.output_ordering(); + let sort_input_ordering = sort_exec.input().output_ordering(); + // Do naive analysis, where a SortExec is already sorted according to desired Sorting + if ordering_satisfy(sort_input_ordering, sort_output_ordering, || { + sort_exec.input().equivalence_properties() + }) { + update_child_to_remove_unnecessary_sort(child, sort_onward)?; + } else if let Some(window_agg_exec) = + requirements.plan.as_any().downcast_ref::() + { + // For window expressions we can remove some Sorts when expression can be calculated in reverse order also. + if let Some(res) = analyze_window_sort_removal( + window_agg_exec, + sort_exec, + sort_onward, + )? { + return Ok(Some(res)); + } + } } } (Some(required), None) => { @@ -245,19 +236,9 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort { /// Analyzes `WindowAggExec` to determine whether Sort can be removed fn analyze_window_sort_removal( window_agg_exec: &WindowAggExec, + sort_exec: &SortExec, sort_onward: &mut Vec<(usize, Arc)>, ) -> Result> { - // If empty immediately return we cannot do analysis - if sort_onward.is_empty() { - return Ok(None); - } - let (_, sort_any) = sort_onward[0].clone(); - let sort_exec = sort_any - .as_any() - .downcast_ref::() - .ok_or_else(|| { - DataFusionError::Plan("First layer should start from SortExec".to_string()) - })?; let required_ordering = sort_exec.output_ordering().ok_or_else(|| { DataFusionError::Plan("SortExec should have output ordering".to_string()) })?; @@ -313,17 +294,24 @@ fn update_child_to_remove_unnecessary_sort( } Ok(()) } -/// Removes the sort from the plan in the `sort_onwards` -fn remove_corresponding_sort_from_sub_plan( - sort_onwards: &mut Vec<(usize, Arc)>, -) -> Result> { - let (sort_child_idx, sort_any) = sort_onwards[0].clone(); + +/// Convert dyn ExecutionPlan to SortExec (Assumes it is SortExec) +fn convert_to_sort_exec(sort_any: &Arc) -> Result<&SortExec> { let sort_exec = sort_any .as_any() .downcast_ref::() .ok_or_else(|| { DataFusionError::Plan("First layer should start from SortExec".to_string()) })?; + Ok(sort_exec) +} + +/// Removes the sort from the plan in the `sort_onwards` +fn remove_corresponding_sort_from_sub_plan( + sort_onwards: &mut Vec<(usize, Arc)>, +) -> Result> { + let (sort_child_idx, sort_any) = sort_onwards[0].clone(); + let sort_exec = convert_to_sort_exec(&sort_any)?; let mut prev_layer = sort_exec.input().clone(); let mut prev_layer_child_idx = sort_child_idx; // We start from 1 hence since first one is sort and we are removing it