Skip to content

Commit

Permalink
Unnecessary SortExec removal rule from Physical Plan (#4691)
Browse files Browse the repository at this point in the history
* Sort Removal rule initial commit

* move ordering satisfy to the util

* update test and change repartition maintain_input_order impl

* simplifications

* partition by refactor (#28)

* partition by refactor

* minor changes

* Unnecessary tuple to Range conversion is removed

* move transpose under common

* Add naive sort removal rule

* Add todo for finer Sort removal handling

* Refactors to improve readability and reduce nesting

* reverse expr returns Option (no need for support check)

* fix tests

* partition by and order by no longer ends up at the same window group

* Refactor to simplify code

* Better comments, change method names

* Resolve errors introduced by syncing

* address reviews

* address reviews

* Rename to less confusing OptimizeSorts

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Dec 26, 2022
1 parent 34475bb commit 8ec511e
Show file tree
Hide file tree
Showing 31 changed files with 2,017 additions and 378 deletions.
10 changes: 10 additions & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod stats;
mod table_reference;
pub mod test_util;

use arrow::compute::SortOptions;
pub use column::Column;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ExprSchema, ToDFSchema};
pub use error::{field_not_found, DataFusionError, Result, SchemaError};
Expand Down Expand Up @@ -63,3 +64,12 @@ macro_rules! downcast_value {
})?
}};
}

/// Computes the "reverse" of given `SortOptions`.
// TODO: If/when arrow supports `!` for `SortOptions`, we can remove this.
pub fn reverse_sort_options(options: SortOptions) -> SortOptions {
SortOptions {
descending: !options.descending,
nulls_first: !options.nulls_first,
}
}
7 changes: 7 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ use url::Url;
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1580,6 +1581,12 @@ impl SessionState {
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

let mut this = SessionState {
session_id,
optimizer: Optimizer::new(),
Expand Down
76 changes: 9 additions & 67 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//!
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
Expand All @@ -29,8 +30,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
Expand All @@ -42,9 +42,8 @@ use datafusion_physical_expr::equivalence::EquivalenceProperties;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::NoOp;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
normalize_sort_expr_with_equivalence_properties, AggregateExpr, PhysicalExpr,
PhysicalSortExpr,
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties, AggregateExpr,
PhysicalExpr,
};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -919,71 +918,14 @@ fn ensure_distribution_and_ordering(
Ok(child)
} else {
let sort_expr = required.unwrap().to_vec();
Ok(Arc::new(SortExec::new_with_partitioning(
sort_expr, child, true, None,
)) as Arc<dyn ExecutionPlan>)
add_sort_above_child(&child, sort_expr)
}
})
.collect();

with_new_children_if_necessary(plan, new_children?)
}

/// Check the required ordering requirements are satisfied by the provided PhysicalSortExprs.
fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortExpr]>,
equal_properties: F,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
(Some(provided), Some(required)) => {
if required.len() > provided.len() {
false
} else {
let fast_match = required
.iter()
.zip(provided.iter())
.all(|(order1, order2)| order1.eq(order2));

if !fast_match {
let eq_properties = equal_properties();
let eq_classes = eq_properties.classes();
if !eq_classes.is_empty() {
let normalized_required_exprs = required
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
let normalized_provided_exprs = provided
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(
e.clone(),
eq_classes,
)
})
.collect::<Vec<_>>();
normalized_required_exprs
.iter()
.zip(normalized_provided_exprs.iter())
.all(|(order1, order2)| order1.eq(order2))
} else {
fast_match
}
} else {
fast_match
}
}
}
}
}

#[derive(Debug, Clone)]
struct JoinKeyPairs {
left_keys: Vec<Arc<dyn PhysicalExpr>>,
Expand Down Expand Up @@ -1063,10 +1005,10 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_expr::logical_plan::JoinType;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::binary;
use datafusion_physical_expr::expressions::lit;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{expressions, PhysicalExpr};
use datafusion_physical_expr::{
expressions, expressions::binary, expressions::lit, expressions::Column,
PhysicalExpr, PhysicalSortExpr,
};
use std::ops::Deref;

use super::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
pub mod pruning;
pub mod repartition;
Expand Down
Loading

0 comments on commit 8ec511e

Please sign in to comment.