Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sort enforcement refactor #5228

Merged
merged 5 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 94 additions & 84 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::add_sort_above_child;
use crate::physical_optimizer::utils::add_sort_above;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
Expand Down Expand Up @@ -65,15 +65,28 @@ impl EnforceSorting {
/// leading to `SortExec`s.
#[derive(Debug, Clone)]
struct ExecTree {
/// The `ExecutionPlan` associated with this node
pub plan: Arc<dyn ExecutionPlan>,
/// Child index of the plan in its parent
pub idx: usize,
/// Children of the plan that would need updating if we remove leaf executors
pub children: Vec<ExecTree>,
/// The `ExecutionPlan` associated with this node
pub plan: Arc<dyn ExecutionPlan>,
}

impl ExecTree {
/// Create new Exec tree
pub fn new(
plan: Arc<dyn ExecutionPlan>,
idx: usize,
children: Vec<ExecTree>,
) -> Self {
ExecTree {
plan,
idx,
children,
}
}

/// This function returns the executors at the leaves of the tree.
fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
if self.children.is_empty() {
Expand Down Expand Up @@ -123,11 +136,7 @@ impl PlanWithCorrespondingSort {
// that maintain this ordering. If we just saw a order imposing
// operator, we reset the tree and start accumulating.
if is_sort(plan) {
return Some(ExecTree {
idx,
plan: item.plan,
children: vec![],
});
return Some(ExecTree::new(item.plan, idx, vec![]));
} else if is_limit(plan) {
// There is no sort linkage for this path, it starts at a limit.
return None;
Expand Down Expand Up @@ -155,11 +164,7 @@ impl PlanWithCorrespondingSort {
if !children.is_empty() {
// Add parent node to the tree if there is at least one
// child with a subtree:
Some(ExecTree {
idx,
plan: item.plan,
children,
})
Some(ExecTree::new(item.plan, idx, children))
} else {
// There is no sort linkage for this child, do nothing.
None
Expand Down Expand Up @@ -238,11 +243,7 @@ impl PlanWithCorrespondingCoalescePartitions {
// operator, we reset the tree and start accumulating.
let plan = item.plan;
if plan.as_any().is::<CoalescePartitionsExec>() {
Some(ExecTree {
idx,
plan,
children: vec![],
})
Some(ExecTree::new(plan, idx, vec![]))
} else if plan.children().is_empty() {
// Plan has no children, there is nothing to propagate.
None
Expand All @@ -263,11 +264,7 @@ impl PlanWithCorrespondingCoalescePartitions {
if children.is_empty() {
None
} else {
Some(ExecTree {
idx,
plan,
children,
})
Some(ExecTree::new(plan, idx, children))
}
}
})
Expand Down Expand Up @@ -361,31 +358,29 @@ fn parallelize_sorts(
let mut coalesce_onwards = requirements.coalesce_onwards;
// We know that `plan` has children, so `coalesce_onwards` is non-empty.
if coalesce_onwards[0].is_some() {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
if (is_sort(&plan) || is_sort_preserving_merge(&plan))
// Make sure that Sort is actually global sort
&& plan.output_partitioning().partition_count() == 1
{
// If there is a connection between a `CoalescePartitionsExec` and a
// `SortExec` that satisfy the requirements (i.e. they don't require a
// single partition), then we can replace the `CoalescePartitionsExec`
// + `SortExec` cascade with a `SortExec` + `SortPreservingMergeExec`
// Global Sort that satisfy the requirements (i.e. intermediate
// executors don't require single partition), then we can
// replace the `CoalescePartitionsExec`+ GlobalSort cascade with
// the `SortExec` + `SortPreservingMergeExec`
// cascade to parallelize sorting.
let mut prev_layer = plan.clone();
update_child_to_change_coalesce(
&mut prev_layer,
&mut coalesce_onwards[0],
Some(sort_exec),
)?;
let spm = SortPreservingMergeExec::new(sort_exec.expr().to_vec(), prev_layer);
update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?;
let sort_exprs = get_sort_exprs(&plan)?;
add_sort_above(&mut prev_layer, sort_exprs.to_vec())?;
let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer);
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: Arc::new(spm),
coalesce_onwards: vec![None],
}));
} else if plan.as_any().is::<CoalescePartitionsExec>() {
// There is an unnecessary `CoalescePartitionExec` in the plan.
let mut prev_layer = plan.clone();
update_child_to_change_coalesce(
&mut prev_layer,
&mut coalesce_onwards[0],
None,
)?;
update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?;
let new_plan = plan.with_new_children(vec![prev_layer])?;
return Ok(Some(PlanWithCorrespondingCoalescePartitions {
plan: new_plan,
Expand Down Expand Up @@ -449,12 +444,8 @@ fn ensure_sorting(
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr = required_ordering.to_vec();
*child = add_sort_above_child(child, sort_expr)?;
*sort_onwards = Some(ExecTree {
idx,
plan: child.clone(),
children: vec![],
})
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when we can
Expand All @@ -470,12 +461,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
*child = add_sort_above_child(child, required.to_vec())?;
*sort_onwards = Some(ExecTree {
idx,
plan: child.clone(),
children: vec![],
})
add_sort_above(child, required.to_vec())?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
// We have a `SortExec` whose effect may be neutralized by
Expand Down Expand Up @@ -528,11 +515,11 @@ fn analyze_immediate_sort_removal(
sort_exec.expr().to_vec(),
sort_input,
));
let new_tree = ExecTree {
idx: 0,
plan: new_plan.clone(),
children: sort_onwards.iter().flat_map(|e| e.clone()).collect(),
};
let new_tree = ExecTree::new(
new_plan.clone(),
0,
sort_onwards.iter().flat_map(|e| e.clone()).collect(),
);
PlanWithCorrespondingSort {
plan: new_plan,
sort_onwards: vec![Some(new_tree)],
Expand Down Expand Up @@ -658,21 +645,19 @@ fn analyze_window_sort_removal(
}

/// Updates child to remove the unnecessary `CoalescePartitions` below it.
fn update_child_to_change_coalesce(
fn update_child_to_remove_coalesce(
child: &mut Arc<dyn ExecutionPlan>,
coalesce_onwards: &mut Option<ExecTree>,
sort_exec: Option<&SortExec>,
) -> Result<()> {
if let Some(coalesce_onwards) = coalesce_onwards {
*child = change_corresponding_coalesce_in_sub_plan(coalesce_onwards, sort_exec)?;
*child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards)?;
}
Ok(())
}

/// Removes the `CoalescePartitions` from the plan in `coalesce_onwards`.
fn change_corresponding_coalesce_in_sub_plan(
fn remove_corresponding_coalesce_in_sub_plan(
coalesce_onwards: &mut ExecTree,
sort_exec: Option<&SortExec>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(
if coalesce_onwards
Expand All @@ -681,24 +666,12 @@ fn change_corresponding_coalesce_in_sub_plan(
.is::<CoalescePartitionsExec>()
{
// We can safely use the 0th index since we have a `CoalescePartitionsExec`.
let coalesce_input = coalesce_onwards.plan.children()[0].clone();
if let Some(sort_exec) = sort_exec {
let sort_expr = sort_exec.expr();
if !ordering_satisfy(
coalesce_input.output_ordering(),
Some(sort_expr),
|| coalesce_input.equivalence_properties(),
) {
return add_sort_above_child(&coalesce_input, sort_expr.to_vec());
}
}
coalesce_input
coalesce_onwards.plan.children()[0].clone()
} else {
let plan = coalesce_onwards.plan.clone();
let mut children = plan.children();
for item in &mut coalesce_onwards.children {
children[item.idx] =
change_corresponding_coalesce_in_sub_plan(item, sort_exec)?;
children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item)?;
}
plan.with_new_children(children)?
},
Expand Down Expand Up @@ -779,15 +752,11 @@ fn change_finer_sort_in_sub_plan(
let plan = &sort_onwards.plan;
// A `SortExec` is always at the bottom of the tree.
if is_sort(plan) {
let prev_layer = plan.children()[0].clone();
let mut prev_layer = plan.children()[0].clone();
let new_sort_expr = get_sort_exprs(plan)?[0..n_sort_expr].to_vec();
let updated_plan = add_sort_above_child(&prev_layer, new_sort_expr)?;
*sort_onwards = ExecTree {
idx: sort_onwards.idx,
children: vec![],
plan: updated_plan.clone(),
};
Ok(updated_plan)
add_sort_above(&mut prev_layer, new_sort_expr)?;
*sort_onwards = ExecTree::new(prev_layer.clone(), sort_onwards.idx, vec![]);
Ok(prev_layer)
} else {
let mut children = plan.children();
for item in &mut sort_onwards.children {
Expand Down Expand Up @@ -1718,8 +1687,8 @@ mod tests {
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
" SortExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" FilterExec: NOT non_nullable_col@1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plan change looks better to me as well (do filtering before sort)

" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
Expand Down Expand Up @@ -1776,6 +1745,47 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_coalesce_propagate() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let repartition = repartition_exec(source);
let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(repartition));
let repartition = repartition_exec(coalesce_partitions);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
// Add local sort
let sort = Arc::new(SortExec::new_with_partitioning(
sort_exprs.clone(),
repartition,
true,
None,
)) as _;
let spm = sort_preserving_merge_exec(sort_exprs.clone(), sort);
let sort = sort_exec(sort_exprs, spm);

let physical_plan = sort.clone();
// Sort Parallelize rule should end Coalesce + Sort linkage when Sort is Global Sort
// Also input plan is not valid as it is. We need to add SortExec before SortPreservingMergeExec.
let expected_input = vec![
"SortExec: [nullable_col@0 ASC]",
" SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
" CoalescePartitionsExec",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
];
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" SortExec: [nullable_col@0 ASC]",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
" RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=0",
" MemoryExec: partitions=0, partition_sizes=[]",
Comment on lines +1781 to +1783
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This plan definitely looks better than the input.

];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
Expand Down
34 changes: 17 additions & 17 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

Expand All @@ -48,22 +49,21 @@ pub fn optimize_children(
}
}

/// Util function to add SortExec above child
/// preserving the original partitioning
pub fn add_sort_above_child(
child: &Arc<dyn ExecutionPlan>,
/// This utility function adds a `SortExec` above an operator according to the
/// given ordering requirements while preserving the original partitioning.
pub fn add_sort_above(
node: &mut Arc<dyn ExecutionPlan>,
sort_expr: Vec<PhysicalSortExpr>,
) -> Result<Arc<dyn ExecutionPlan>> {
let new_child = if child.output_partitioning().partition_count() > 1 {
Arc::new(SortExec::new_with_partitioning(
sort_expr,
child.clone(),
true,
None,
)) as Arc<dyn ExecutionPlan>
} else {
Arc::new(SortExec::try_new(sort_expr, child.clone(), None)?)
as Arc<dyn ExecutionPlan>
};
Ok(new_child)
) -> Result<()> {
// If the ordering requirement is already satisfied, do not add a sort.
if !ordering_satisfy(node.output_ordering(), Some(&sort_expr), || {
node.equivalence_properties()
}) {
*node = Arc::new(if node.output_partitioning().partition_count() > 1 {
SortExec::new_with_partitioning(sort_expr, node.clone(), true, None)
} else {
SortExec::try_new(sort_expr, node.clone(), None)?
}) as _
}
Ok(())
}