Skip to content

Commit

Permalink
simplifications
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Dec 13, 2022
1 parent dfb6683 commit 0a42315
Showing 1 changed file with 37 additions and 49 deletions.
86 changes: 37 additions & 49 deletions datafusion/core/src/physical_optimizer/remove_unnecessary_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,46 +88,37 @@ fn remove_unnecessary_sorts(
ordering_satisfy_inner(physical_ordering, required_ordering, || {
child.equivalence_properties()
});
if is_ordering_satisfied {
// can do analysis for sort removal
if !sort_onward.is_empty() {
let (_, sort_any) = sort_onward[0].clone();
let sort_exec = sort_any
.as_any()
.downcast_ref::<SortExec>()
.ok_or_else(|| {
DataFusionError::Plan(
"First layer should start from SortExec".to_string(),
)
})?;
let sort_output_ordering = sort_exec.output_ordering();
let sort_input_ordering = sort_exec.input().output_ordering();
// Do naive analysis, where a SortExec is already sorted according to desired Sorting
if ordering_satisfy(
sort_input_ordering,
sort_output_ordering,
|| sort_exec.input().equivalence_properties(),
) {
update_child_to_remove_unnecessary_sort(child, sort_onward)?;
} else if let Some(window_agg_exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
// For window expressions we can remove some Sorts when expression can be calculated in reverse order also.
if let Some(res) =
analyze_window_sort_removal(window_agg_exec, sort_onward)?
{
return Ok(Some(res));
}
}
}
} else {
if !is_ordering_satisfied {
// During sort Removal we have invalidated ordering invariant fix it
// This is effectively moving sort above in the physical plan
update_child_to_remove_unnecessary_sort(child, sort_onward)?;
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
// Since we have added Sort, we add it to the sort_onwards also.
sort_onward.push((idx, child.clone()))
} else if is_ordering_satisfied && !sort_onward.is_empty() {
// can do analysis for sort removal
let (_, sort_any) = sort_onward[0].clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let sort_output_ordering = sort_exec.output_ordering();
let sort_input_ordering = sort_exec.input().output_ordering();
// Do naive analysis, where a SortExec is already sorted according to desired Sorting
if ordering_satisfy(sort_input_ordering, sort_output_ordering, || {
sort_exec.input().equivalence_properties()
}) {
update_child_to_remove_unnecessary_sort(child, sort_onward)?;
} else if let Some(window_agg_exec) =
requirements.plan.as_any().downcast_ref::<WindowAggExec>()
{
// For window expressions we can remove some Sorts when expression can be calculated in reverse order also.
if let Some(res) = analyze_window_sort_removal(
window_agg_exec,
sort_exec,
sort_onward,
)? {
return Ok(Some(res));
}
}
}
}
(Some(required), None) => {
Expand Down Expand Up @@ -245,19 +236,9 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
/// Analyzes `WindowAggExec` to determine whether Sort can be removed
fn analyze_window_sort_removal(
window_agg_exec: &WindowAggExec,
sort_exec: &SortExec,
sort_onward: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Option<PlanWithCorrespondingSort>> {
// If empty immediately return we cannot do analysis
if sort_onward.is_empty() {
return Ok(None);
}
let (_, sort_any) = sort_onward[0].clone();
let sort_exec = sort_any
.as_any()
.downcast_ref::<SortExec>()
.ok_or_else(|| {
DataFusionError::Plan("First layer should start from SortExec".to_string())
})?;
let required_ordering = sort_exec.output_ordering().ok_or_else(|| {
DataFusionError::Plan("SortExec should have output ordering".to_string())
})?;
Expand Down Expand Up @@ -313,17 +294,24 @@ fn update_child_to_remove_unnecessary_sort(
}
Ok(())
}
/// Removes the sort from the plan in the `sort_onwards`
fn remove_corresponding_sort_from_sub_plan(
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (sort_child_idx, sort_any) = sort_onwards[0].clone();

/// Convert dyn ExecutionPlan to SortExec (Assumes it is SortExec)
fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec> {
let sort_exec = sort_any
.as_any()
.downcast_ref::<SortExec>()
.ok_or_else(|| {
DataFusionError::Plan("First layer should start from SortExec".to_string())
})?;
Ok(sort_exec)
}

/// Removes the sort from the plan in the `sort_onwards`
fn remove_corresponding_sort_from_sub_plan(
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (sort_child_idx, sort_any) = sort_onwards[0].clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let mut prev_layer = sort_exec.input().clone();
let mut prev_layer_child_idx = sort_child_idx;
// We start from 1 hence since first one is sort and we are removing it
Expand Down

0 comments on commit 0a42315

Please sign in to comment.