diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs new file mode 100644 index 000000000000..fd8e19031f26 --- /dev/null +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -0,0 +1,488 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::expressions::Column; +use crate::{ + physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, + LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, + PhysicalSortRequirement, +}; + +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::JoinType; + +use crate::equivalence::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; +use crate::physical_expr::deduplicate_physical_exprs; + +/// An `EquivalenceClass` is a set of [`Arc`]s that are known +/// to have the same value for all tuples in a relation. These are generated by +/// equality predicates (e.g. `a = b`), typically equi-join conditions and +/// equality conditions in filters. +/// +/// Two `EquivalenceClass`es are equal if they contains the same expressions in +/// without any ordering. +#[derive(Debug, Clone)] +pub struct EquivalenceClass { + /// The expressions in this equivalence class. The order doesn't + /// matter for equivalence purposes + /// + /// TODO: use a HashSet for this instead of a Vec + exprs: Vec>, +} + +impl PartialEq for EquivalenceClass { + /// Returns true if other is equal in the sense + /// of bags (multi-sets), disregarding their orderings. + fn eq(&self, other: &Self) -> bool { + physical_exprs_bag_equal(&self.exprs, &other.exprs) + } +} + +impl EquivalenceClass { + /// Create a new empty equivalence class + pub fn new_empty() -> Self { + Self { exprs: vec![] } + } + + // Create a new equivalence class from a pre-existing `Vec` + pub fn new(mut exprs: Vec>) -> Self { + deduplicate_physical_exprs(&mut exprs); + Self { exprs } + } + + /// Return the inner vector of expressions + pub fn into_vec(self) -> Vec> { + self.exprs + } + + /// Return the "canonical" expression for this class (the first element) + /// if any + fn canonical_expr(&self) -> Option> { + self.exprs.first().cloned() + } + + /// Insert the expression into this class, meaning it is known to be equal to + /// all other expressions in this class + pub fn push(&mut self, expr: Arc) { + if !self.contains(&expr) { + self.exprs.push(expr); + } + } + + /// Inserts all the expressions from other into this class + pub fn extend(&mut self, other: Self) { + for expr in other.exprs { + // use push so entries are deduplicated + self.push(expr); + } + } + + /// Returns true if this equivalence class contains the expression + pub fn contains(&self, expr: &Arc) -> bool { + physical_exprs_contains(&self.exprs, expr) + } + + /// Returns true if this equivalence class has any entries in common with `other` + pub fn contains_any(&self, other: &Self) -> bool { + self.exprs.iter().any(|e| other.contains(e)) + } + + /// return the number of items in this class + pub fn len(&self) -> usize { + self.exprs.len() + } + + /// return true if this class is empty + pub fn is_empty(&self) -> bool { + self.exprs.is_empty() + } + + /// Iterate over all elements in this class, in some arbitrary order + pub fn iter(&self) -> impl Iterator> { + self.exprs.iter() + } + + /// Return a new equivalence class that have the specified offset added to + /// each expression (used when schemas are appended such as in joins) + pub fn with_offset(&self, offset: usize) -> Self { + let new_exprs = self + .exprs + .iter() + .cloned() + .map(|e| add_offset_to_expr(e, offset)) + .collect(); + Self::new(new_exprs) + } +} + +/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each +/// class represents a distinct equivalence class in a relation. +#[derive(Debug, Clone)] +pub struct EquivalenceGroup { + classes: Vec, +} + +impl EquivalenceGroup { + /// Creates an empty equivalence group. + pub fn empty() -> Self { + Self { classes: vec![] } + } + + /// Creates an equivalence group from the given equivalence classes. + pub fn new(classes: Vec) -> Self { + let mut result = EquivalenceGroup { classes }; + result.remove_redundant_entries(); + result + } + + /// Returns how many equivalence classes there are in this group. + pub fn len(&self) -> usize { + self.classes.len() + } + + /// Checks whether this equivalence group is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the inner vector of equivalence classes + pub fn into_vec(self) -> Vec { + self.classes + } + + /// Returns an iterator over the equivalence classes in this group. + pub fn iter(&self) -> impl Iterator { + self.classes.iter() + } + + /// Adds the equality `left` = `right` to this equivalence group. + /// New equality conditions often arise after steps like `Filter(a = b)`, + /// `Alias(a, a as b)` etc. + pub fn add_equal_conditions( + &mut self, + left: &Arc, + right: &Arc, + ) { + let mut first_class = None; + let mut second_class = None; + for (idx, cls) in self.classes.iter().enumerate() { + if cls.contains(left) { + first_class = Some(idx); + } + if cls.contains(right) { + second_class = Some(idx); + } + } + match (first_class, second_class) { + (Some(mut first_idx), Some(mut second_idx)) => { + // If the given left and right sides belong to different classes, + // we should unify/bridge these classes. + if first_idx != second_idx { + // By convention make sure second_idx is larger than first_idx. + if first_idx > second_idx { + (first_idx, second_idx) = (second_idx, first_idx); + } + // Remove second_idx from self.classes then merge its values with class at first_idx. + // Convention above makes sure that first_idx is still valid after second_idx removal. + let other_class = self.classes.swap_remove(second_idx); + self.classes[first_idx].extend(other_class); + } + } + (Some(group_idx), None) => { + // Right side is new, extend left side's class: + self.classes[group_idx].push(right.clone()); + } + (None, Some(group_idx)) => { + // Left side is new, extend right side's class: + self.classes[group_idx].push(left.clone()); + } + (None, None) => { + // None of the expressions is among existing classes. + // Create a new equivalence class and extend the group. + self.classes + .push(EquivalenceClass::new(vec![left.clone(), right.clone()])); + } + } + } + + /// Removes redundant entries from this group. + pub fn remove_redundant_entries(&mut self) { + // Remove duplicate entries from each equivalence class: + self.classes.retain_mut(|cls| { + // Keep groups that have at least two entries as singleton class is + // meaningless (i.e. it contains no non-trivial information): + cls.len() > 1 + }); + // Unify/bridge groups that have common expressions: + self.bridge_classes() + } + + /// This utility function unifies/bridges classes that have common expressions. + /// For example, assume that we have [`EquivalenceClass`]es `[a, b]` and `[b, c]`. + /// Since both classes contain `b`, columns `a`, `b` and `c` are actually all + /// equal and belong to one class. This utility converts merges such classes. + pub fn bridge_classes(&mut self) { + let mut idx = 0; + while idx < self.classes.len() { + let mut next_idx = idx + 1; + let start_size = self.classes[idx].len(); + while next_idx < self.classes.len() { + if self.classes[idx].contains_any(&self.classes[next_idx]) { + let extension = self.classes.swap_remove(next_idx); + self.classes[idx].extend(extension); + } else { + next_idx += 1; + } + } + if self.classes[idx].len() > start_size { + continue; + } + idx += 1; + } + } + + /// Extends this equivalence group with the `other` equivalence group. + pub fn extend(&mut self, other: Self) { + self.classes.extend(other.classes); + self.remove_redundant_entries(); + } + + /// Normalizes the given physical expression according to this group. + /// The expression is replaced with the first expression in the equivalence + /// class it matches with (if any). + pub fn normalize_expr(&self, expr: Arc) -> Arc { + expr.clone() + .transform(&|expr| { + for cls in self.iter() { + if cls.contains(&expr) { + return Ok(Transformed::Yes(cls.canonical_expr().unwrap())); + } + } + Ok(Transformed::No(expr)) + }) + .unwrap_or(expr) + } + + /// Normalizes the given sort expression according to this group. + /// The underlying physical expression is replaced with the first expression + /// in the equivalence class it matches with (if any). If the underlying + /// expression does not belong to any equivalence class in this group, returns + /// the sort expression as is. + pub fn normalize_sort_expr( + &self, + mut sort_expr: PhysicalSortExpr, + ) -> PhysicalSortExpr { + sort_expr.expr = self.normalize_expr(sort_expr.expr); + sort_expr + } + + /// Normalizes the given sort requirement according to this group. + /// The underlying physical expression is replaced with the first expression + /// in the equivalence class it matches with (if any). If the underlying + /// expression does not belong to any equivalence class in this group, returns + /// the given sort requirement as is. + pub fn normalize_sort_requirement( + &self, + mut sort_requirement: PhysicalSortRequirement, + ) -> PhysicalSortRequirement { + sort_requirement.expr = self.normalize_expr(sort_requirement.expr); + sort_requirement + } + + /// This function applies the `normalize_expr` function for all expressions + /// in `exprs` and returns the corresponding normalized physical expressions. + pub fn normalize_exprs( + &self, + exprs: impl IntoIterator>, + ) -> Vec> { + exprs + .into_iter() + .map(|expr| self.normalize_expr(expr)) + .collect() + } + + /// This function applies the `normalize_sort_expr` function for all sort + /// expressions in `sort_exprs` and returns the corresponding normalized + /// sort expressions. + pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { + // Convert sort expressions to sort requirements: + let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); + // Normalize the requirements: + let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); + // Convert sort requirements back to sort expressions: + PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs) + } + + /// This function applies the `normalize_sort_requirement` function for all + /// requirements in `sort_reqs` and returns the corresponding normalized + /// sort requirements. + pub fn normalize_sort_requirements( + &self, + sort_reqs: LexRequirementRef, + ) -> LexRequirement { + collapse_lex_req( + sort_reqs + .iter() + .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) + .collect(), + ) + } + + /// Projects `expr` according to the given projection mapping. + /// If the resulting expression is invalid after projection, returns `None`. + pub fn project_expr( + &self, + mapping: &ProjectionMapping, + expr: &Arc, + ) -> Option> { + let children = expr.children(); + if children.is_empty() { + for (source, target) in mapping.iter() { + // If we match the source, or an equivalent expression to source, + // then we can project. For example, if we have the mapping + // (a as a1, a + c) and the equivalence class (a, b), expression + // b also projects to a1. + if source.eq(expr) + || self + .get_equivalence_class(source) + .map_or(false, |group| group.contains(expr)) + { + return Some(target.clone()); + } + } + } + // Project a non-leaf expression by projecting its children. + else if let Some(children) = children + .into_iter() + .map(|child| self.project_expr(mapping, &child)) + .collect::>>() + { + return Some(expr.clone().with_new_children(children).unwrap()); + } + // Arriving here implies the expression was invalid after projection. + None + } + + /// Projects `ordering` according to the given projection mapping. + /// If the resulting ordering is invalid after projection, returns `None`. + pub fn project_ordering( + &self, + mapping: &ProjectionMapping, + ordering: LexOrderingRef, + ) -> Option { + // If any sort expression is invalid after projection, rest of the + // ordering shouldn't be projected either. For example, if input ordering + // is [a ASC, b ASC, c ASC], and column b is not valid after projection, + // the result should be [a ASC], not [a ASC, c ASC], even if column c is + // valid after projection. + let result = ordering + .iter() + .map_while(|sort_expr| { + self.project_expr(mapping, &sort_expr.expr) + .map(|expr| PhysicalSortExpr { + expr, + options: sort_expr.options, + }) + }) + .collect::>(); + (!result.is_empty()).then_some(result) + } + + /// Projects this equivalence group according to the given projection mapping. + pub fn project(&self, mapping: &ProjectionMapping) -> Self { + let projected_classes = self.iter().filter_map(|cls| { + let new_class = cls + .iter() + .filter_map(|expr| self.project_expr(mapping, expr)) + .collect::>(); + (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) + }); + // TODO: Convert the algorithm below to a version that uses `HashMap`. + // once `Arc` can be stored in `HashMap`. + // See issue: https://github.com/apache/arrow-datafusion/issues/8027 + let mut new_classes = vec![]; + for (source, target) in mapping.iter() { + if new_classes.is_empty() { + new_classes.push((source, vec![target.clone()])); + } + if let Some((_, values)) = + new_classes.iter_mut().find(|(key, _)| key.eq(source)) + { + if !physical_exprs_contains(values, target) { + values.push(target.clone()); + } + } + } + // Only add equivalence classes with at least two members as singleton + // equivalence classes are meaningless. + let new_classes = new_classes + .into_iter() + .filter_map(|(_, values)| (values.len() > 1).then_some(values)) + .map(EquivalenceClass::new); + + let classes = projected_classes.chain(new_classes).collect(); + Self::new(classes) + } + + /// Returns the equivalence class that contains `expr`. + /// If none of the equivalence classes contains `expr`, returns `None`. + fn get_equivalence_class( + &self, + expr: &Arc, + ) -> Option<&EquivalenceClass> { + self.iter().find(|cls| cls.contains(expr)) + } + + /// Combine equivalence groups of the given join children. + pub fn join( + &self, + right_equivalences: &Self, + join_type: &JoinType, + left_size: usize, + on: &[(Column, Column)], + ) -> Self { + match join_type { + JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + let mut result = Self::new( + self.iter() + .cloned() + .chain( + right_equivalences + .iter() + .map(|cls| cls.with_offset(left_size)), + ) + .collect(), + ); + // In we have an inner join, expressions in the "on" condition + // are equal in the resulting table. + if join_type == &JoinType::Inner { + for (lhs, rhs) in on.iter() { + let index = rhs.index() + left_size; + let new_lhs = Arc::new(lhs.clone()) as _; + let new_rhs = Arc::new(Column::new(rhs.name(), index)) as _; + result.add_equal_conditions(&new_lhs, &new_rhs); + } + } + result + } + JoinType::LeftSemi | JoinType::LeftAnti => self.clone(), + JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(), + } + } +} diff --git a/datafusion/physical-expr/src/equivalence.rs b/datafusion/physical-expr/src/equivalence/mod.rs similarity index 55% rename from datafusion/physical-expr/src/equivalence.rs rename to datafusion/physical-expr/src/equivalence/mod.rs index f3bfe4961622..fc07eb1faebc 100644 --- a/datafusion/physical-expr/src/equivalence.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -15,539 +15,24 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashSet; -use std::hash::Hash; use std::sync::Arc; use crate::expressions::Column; -use crate::sort_properties::{ExprOrdering, SortProperties}; -use crate::{ - physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexOrderingRef, - LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalSortExpr, - PhysicalSortRequirement, -}; +use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement}; use arrow::datatypes::SchemaRef; -use arrow_schema::SortOptions; use datafusion_common::tree_node::{Transformed, TreeNode}; -use datafusion_common::{JoinSide, JoinType, Result}; +use datafusion_common::{JoinSide, JoinType}; -use crate::physical_expr::deduplicate_physical_exprs; -use indexmap::map::Entry; -use indexmap::IndexMap; +mod class; +mod ordering; +mod projection; +mod properties; -/// An `EquivalenceClass` is a set of [`Arc`]s that are known -/// to have the same value for all tuples in a relation. These are generated by -/// equality predicates (e.g. `a = b`), typically equi-join conditions and -/// equality conditions in filters. -/// -/// Two `EquivalenceClass`es are equal if they contains the same expressions in -/// without any ordering. -#[derive(Debug, Clone)] -pub struct EquivalenceClass { - /// The expressions in this equivalence class. The order doesn't - /// matter for equivalence purposes - /// - /// TODO: use a HashSet for this instead of a Vec - exprs: Vec>, -} - -impl PartialEq for EquivalenceClass { - /// Returns true if other is equal in the sense - /// of bags (multi-sets), disregarding their orderings. - fn eq(&self, other: &Self) -> bool { - physical_exprs_bag_equal(&self.exprs, &other.exprs) - } -} - -impl EquivalenceClass { - /// Create a new empty equivalence class - pub fn new_empty() -> Self { - Self { exprs: vec![] } - } - - // Create a new equivalence class from a pre-existing `Vec` - pub fn new(mut exprs: Vec>) -> Self { - deduplicate_physical_exprs(&mut exprs); - Self { exprs } - } - - /// Return the inner vector of expressions - pub fn into_vec(self) -> Vec> { - self.exprs - } - - /// Return the "canonical" expression for this class (the first element) - /// if any - fn canonical_expr(&self) -> Option> { - self.exprs.first().cloned() - } - - /// Insert the expression into this class, meaning it is known to be equal to - /// all other expressions in this class - pub fn push(&mut self, expr: Arc) { - if !self.contains(&expr) { - self.exprs.push(expr); - } - } - - /// Inserts all the expressions from other into this class - pub fn extend(&mut self, other: Self) { - for expr in other.exprs { - // use push so entries are deduplicated - self.push(expr); - } - } - - /// Returns true if this equivalence class contains t expression - pub fn contains(&self, expr: &Arc) -> bool { - physical_exprs_contains(&self.exprs, expr) - } - - /// Returns true if this equivalence class has any entries in common with `other` - pub fn contains_any(&self, other: &Self) -> bool { - self.exprs.iter().any(|e| other.contains(e)) - } - - /// return the number of items in this class - pub fn len(&self) -> usize { - self.exprs.len() - } - - /// return true if this class is empty - pub fn is_empty(&self) -> bool { - self.exprs.is_empty() - } - - /// Iterate over all elements in this class, in some arbitrary order - pub fn iter(&self) -> impl Iterator> { - self.exprs.iter() - } - - /// Return a new equivalence class that have the specified offset added to - /// each expression (used when schemas are appended such as in joins) - pub fn with_offset(&self, offset: usize) -> Self { - let new_exprs = self - .exprs - .iter() - .cloned() - .map(|e| add_offset_to_expr(e, offset)) - .collect(); - Self::new(new_exprs) - } -} - -/// Stores the mapping between source expressions and target expressions for a -/// projection. -#[derive(Debug, Clone)] -pub struct ProjectionMapping { - /// `(source expression)` --> `(target expression)` - /// Indices in the vector corresponds to the indices after projection. - inner: Vec<(Arc, Arc)>, -} - -impl ProjectionMapping { - /// Constructs the mapping between a projection's input and output - /// expressions. - /// - /// For example, given the input projection expressions (`a+b`, `c+d`) - /// and an output schema with two columns `"c+d"` and `"a+b"` - /// the projection mapping would be - /// ```text - /// [0]: (c+d, col("c+d")) - /// [1]: (a+b, col("a+b")) - /// ``` - /// where `col("c+d")` means the column named "c+d". - pub fn try_new( - expr: &[(Arc, String)], - input_schema: &SchemaRef, - ) -> Result { - // Construct a map from the input expressions to the output expression of the projection: - let mut inner = vec![]; - for (expr_idx, (expression, name)) in expr.iter().enumerate() { - let target_expr = Arc::new(Column::new(name, expr_idx)) as _; - - let source_expr = expression.clone().transform_down(&|e| match e - .as_any() - .downcast_ref::( - ) { - Some(col) => { - // Sometimes, expression and its name in the input_schema doesn't match. - // This can cause problems. Hence in here we make sure that expression name - // matches with the name in the inout_schema. - // Conceptually, source_expr and expression should be same. - let idx = col.index(); - let matching_input_field = input_schema.field(idx); - let matching_input_column = - Column::new(matching_input_field.name(), idx); - Ok(Transformed::Yes(Arc::new(matching_input_column))) - } - None => Ok(Transformed::No(e)), - })?; - - inner.push((source_expr, target_expr)); - } - Ok(Self { inner }) - } - - /// Iterate over pairs of (source, target) expressions - pub fn iter( - &self, - ) -> impl Iterator, Arc)> + '_ { - self.inner.iter() - } -} - -/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each -/// class represents a distinct equivalence class in a relation. -#[derive(Debug, Clone)] -pub struct EquivalenceGroup { - classes: Vec, -} - -impl EquivalenceGroup { - /// Creates an empty equivalence group. - fn empty() -> Self { - Self { classes: vec![] } - } - - /// Creates an equivalence group from the given equivalence classes. - fn new(classes: Vec) -> Self { - let mut result = EquivalenceGroup { classes }; - result.remove_redundant_entries(); - result - } - - /// Returns how many equivalence classes there are in this group. - fn len(&self) -> usize { - self.classes.len() - } - - /// Checks whether this equivalence group is empty. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns an iterator over the equivalence classes in this group. - pub fn iter(&self) -> impl Iterator { - self.classes.iter() - } - - /// Adds the equality `left` = `right` to this equivalence group. - /// New equality conditions often arise after steps like `Filter(a = b)`, - /// `Alias(a, a as b)` etc. - fn add_equal_conditions( - &mut self, - left: &Arc, - right: &Arc, - ) { - let mut first_class = None; - let mut second_class = None; - for (idx, cls) in self.classes.iter().enumerate() { - if cls.contains(left) { - first_class = Some(idx); - } - if cls.contains(right) { - second_class = Some(idx); - } - } - match (first_class, second_class) { - (Some(mut first_idx), Some(mut second_idx)) => { - // If the given left and right sides belong to different classes, - // we should unify/bridge these classes. - if first_idx != second_idx { - // By convention make sure second_idx is larger than first_idx. - if first_idx > second_idx { - (first_idx, second_idx) = (second_idx, first_idx); - } - // Remove second_idx from self.classes then merge its values with class at first_idx. - // Convention above makes sure that first_idx is still valid after second_idx removal. - let other_class = self.classes.swap_remove(second_idx); - self.classes[first_idx].extend(other_class); - } - } - (Some(group_idx), None) => { - // Right side is new, extend left side's class: - self.classes[group_idx].push(right.clone()); - } - (None, Some(group_idx)) => { - // Left side is new, extend right side's class: - self.classes[group_idx].push(left.clone()); - } - (None, None) => { - // None of the expressions is among existing classes. - // Create a new equivalence class and extend the group. - self.classes - .push(EquivalenceClass::new(vec![left.clone(), right.clone()])); - } - } - } - - /// Removes redundant entries from this group. - fn remove_redundant_entries(&mut self) { - // Remove duplicate entries from each equivalence class: - self.classes.retain_mut(|cls| { - // Keep groups that have at least two entries as singleton class is - // meaningless (i.e. it contains no non-trivial information): - cls.len() > 1 - }); - // Unify/bridge groups that have common expressions: - self.bridge_classes() - } - - /// This utility function unifies/bridges classes that have common expressions. - /// For example, assume that we have [`EquivalenceClass`]es `[a, b]` and `[b, c]`. - /// Since both classes contain `b`, columns `a`, `b` and `c` are actually all - /// equal and belong to one class. This utility converts merges such classes. - fn bridge_classes(&mut self) { - let mut idx = 0; - while idx < self.classes.len() { - let mut next_idx = idx + 1; - let start_size = self.classes[idx].len(); - while next_idx < self.classes.len() { - if self.classes[idx].contains_any(&self.classes[next_idx]) { - let extension = self.classes.swap_remove(next_idx); - self.classes[idx].extend(extension); - } else { - next_idx += 1; - } - } - if self.classes[idx].len() > start_size { - continue; - } - idx += 1; - } - } - - /// Extends this equivalence group with the `other` equivalence group. - fn extend(&mut self, other: Self) { - self.classes.extend(other.classes); - self.remove_redundant_entries(); - } - - /// Normalizes the given physical expression according to this group. - /// The expression is replaced with the first expression in the equivalence - /// class it matches with (if any). - pub fn normalize_expr(&self, expr: Arc) -> Arc { - expr.clone() - .transform(&|expr| { - for cls in self.iter() { - if cls.contains(&expr) { - return Ok(Transformed::Yes(cls.canonical_expr().unwrap())); - } - } - Ok(Transformed::No(expr)) - }) - .unwrap_or(expr) - } - - /// Normalizes the given sort expression according to this group. - /// The underlying physical expression is replaced with the first expression - /// in the equivalence class it matches with (if any). If the underlying - /// expression does not belong to any equivalence class in this group, returns - /// the sort expression as is. - pub fn normalize_sort_expr( - &self, - mut sort_expr: PhysicalSortExpr, - ) -> PhysicalSortExpr { - sort_expr.expr = self.normalize_expr(sort_expr.expr); - sort_expr - } - - /// Normalizes the given sort requirement according to this group. - /// The underlying physical expression is replaced with the first expression - /// in the equivalence class it matches with (if any). If the underlying - /// expression does not belong to any equivalence class in this group, returns - /// the given sort requirement as is. - pub fn normalize_sort_requirement( - &self, - mut sort_requirement: PhysicalSortRequirement, - ) -> PhysicalSortRequirement { - sort_requirement.expr = self.normalize_expr(sort_requirement.expr); - sort_requirement - } - - /// This function applies the `normalize_expr` function for all expressions - /// in `exprs` and returns the corresponding normalized physical expressions. - pub fn normalize_exprs( - &self, - exprs: impl IntoIterator>, - ) -> Vec> { - exprs - .into_iter() - .map(|expr| self.normalize_expr(expr)) - .collect() - } - - /// This function applies the `normalize_sort_expr` function for all sort - /// expressions in `sort_exprs` and returns the corresponding normalized - /// sort expressions. - pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { - // Convert sort expressions to sort requirements: - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); - // Normalize the requirements: - let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); - // Convert sort requirements back to sort expressions: - PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs) - } - - /// This function applies the `normalize_sort_requirement` function for all - /// requirements in `sort_reqs` and returns the corresponding normalized - /// sort requirements. - pub fn normalize_sort_requirements( - &self, - sort_reqs: LexRequirementRef, - ) -> LexRequirement { - collapse_lex_req( - sort_reqs - .iter() - .map(|sort_req| self.normalize_sort_requirement(sort_req.clone())) - .collect(), - ) - } - - /// Projects `expr` according to the given projection mapping. - /// If the resulting expression is invalid after projection, returns `None`. - fn project_expr( - &self, - mapping: &ProjectionMapping, - expr: &Arc, - ) -> Option> { - let children = expr.children(); - if children.is_empty() { - for (source, target) in mapping.iter() { - // If we match the source, or an equivalent expression to source, - // then we can project. For example, if we have the mapping - // (a as a1, a + c) and the equivalence class (a, b), expression - // b also projects to a1. - if source.eq(expr) - || self - .get_equivalence_class(source) - .map_or(false, |group| group.contains(expr)) - { - return Some(target.clone()); - } - } - } - // Project a non-leaf expression by projecting its children. - else if let Some(children) = children - .into_iter() - .map(|child| self.project_expr(mapping, &child)) - .collect::>>() - { - return Some(expr.clone().with_new_children(children).unwrap()); - } - // Arriving here implies the expression was invalid after projection. - None - } - - /// Projects `ordering` according to the given projection mapping. - /// If the resulting ordering is invalid after projection, returns `None`. - fn project_ordering( - &self, - mapping: &ProjectionMapping, - ordering: LexOrderingRef, - ) -> Option { - // If any sort expression is invalid after projection, rest of the - // ordering shouldn't be projected either. For example, if input ordering - // is [a ASC, b ASC, c ASC], and column b is not valid after projection, - // the result should be [a ASC], not [a ASC, c ASC], even if column c is - // valid after projection. - let result = ordering - .iter() - .map_while(|sort_expr| { - self.project_expr(mapping, &sort_expr.expr) - .map(|expr| PhysicalSortExpr { - expr, - options: sort_expr.options, - }) - }) - .collect::>(); - (!result.is_empty()).then_some(result) - } - - /// Projects this equivalence group according to the given projection mapping. - pub fn project(&self, mapping: &ProjectionMapping) -> Self { - let projected_classes = self.iter().filter_map(|cls| { - let new_class = cls - .iter() - .filter_map(|expr| self.project_expr(mapping, expr)) - .collect::>(); - (new_class.len() > 1).then_some(EquivalenceClass::new(new_class)) - }); - // TODO: Convert the algorithm below to a version that uses `HashMap`. - // once `Arc` can be stored in `HashMap`. - // See issue: https://github.com/apache/arrow-datafusion/issues/8027 - let mut new_classes = vec![]; - for (source, target) in mapping.iter() { - if new_classes.is_empty() { - new_classes.push((source, vec![target.clone()])); - } - if let Some((_, values)) = - new_classes.iter_mut().find(|(key, _)| key.eq(source)) - { - if !physical_exprs_contains(values, target) { - values.push(target.clone()); - } - } - } - // Only add equivalence classes with at least two members as singleton - // equivalence classes are meaningless. - let new_classes = new_classes - .into_iter() - .filter_map(|(_, values)| (values.len() > 1).then_some(values)) - .map(EquivalenceClass::new); - - let classes = projected_classes.chain(new_classes).collect(); - Self::new(classes) - } - - /// Returns the equivalence class that contains `expr`. - /// If none of the equivalence classes contains `expr`, returns `None`. - fn get_equivalence_class( - &self, - expr: &Arc, - ) -> Option<&EquivalenceClass> { - self.iter().find(|cls| cls.contains(expr)) - } - - /// Combine equivalence groups of the given join children. - pub fn join( - &self, - right_equivalences: &Self, - join_type: &JoinType, - left_size: usize, - on: &[(Column, Column)], - ) -> Self { - match join_type { - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { - let mut result = Self::new( - self.iter() - .cloned() - .chain( - right_equivalences - .iter() - .map(|cls| cls.with_offset(left_size)), - ) - .collect(), - ); - // In we have an inner join, expressions in the "on" condition - // are equal in the resulting table. - if join_type == &JoinType::Inner { - for (lhs, rhs) in on.iter() { - let index = rhs.index() + left_size; - let new_lhs = Arc::new(lhs.clone()) as _; - let new_rhs = Arc::new(Column::new(rhs.name(), index)) as _; - result.add_equal_conditions(&new_lhs, &new_rhs); - } - } - result - } - JoinType::LeftSemi | JoinType::LeftAnti => self.clone(), - JoinType::RightSemi | JoinType::RightAnti => right_equivalences.clone(), - } - } -} +pub use class::{EquivalenceClass, EquivalenceGroup}; +pub use ordering::OrderingEquivalenceClass; +pub use projection::ProjectionMapping; +pub use properties::EquivalenceProperties; /// This function constructs a duplicate-free `LexOrderingReq` by filtering out /// duplicate entries that have same physical expression inside. For example, @@ -562,167 +47,6 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement { output } -/// This function constructs a duplicate-free `LexOrdering` by filtering out -/// duplicate entries that have same physical expression inside. For example, -/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. -pub fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { - let mut output = Vec::::new(); - for item in input { - if !output.iter().any(|req| req.expr.eq(&item.expr)) { - output.push(item); - } - } - output -} - -/// An `OrderingEquivalenceClass` object keeps track of different alternative -/// orderings than can describe a schema. For example, consider the following table: -/// -/// ```text -/// |a|b|c|d| -/// |1|4|3|1| -/// |2|3|3|2| -/// |3|1|2|2| -/// |3|2|1|3| -/// ``` -/// -/// Here, both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the table -/// ordering. In this case, we say that these orderings are equivalent. -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct OrderingEquivalenceClass { - orderings: Vec, -} - -impl OrderingEquivalenceClass { - /// Creates new empty ordering equivalence class. - fn empty() -> Self { - Self { orderings: vec![] } - } - - /// Clears (empties) this ordering equivalence class. - pub fn clear(&mut self) { - self.orderings.clear(); - } - - /// Creates new ordering equivalence class from the given orderings. - pub fn new(orderings: Vec) -> Self { - let mut result = Self { orderings }; - result.remove_redundant_entries(); - result - } - - /// Checks whether `ordering` is a member of this equivalence class. - pub fn contains(&self, ordering: &LexOrdering) -> bool { - self.orderings.contains(ordering) - } - - /// Adds `ordering` to this equivalence class. - #[allow(dead_code)] - fn push(&mut self, ordering: LexOrdering) { - self.orderings.push(ordering); - // Make sure that there are no redundant orderings: - self.remove_redundant_entries(); - } - - /// Checks whether this ordering equivalence class is empty. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns an iterator over the equivalent orderings in this class. - pub fn iter(&self) -> impl Iterator { - self.orderings.iter() - } - - /// Returns how many equivalent orderings there are in this class. - pub fn len(&self) -> usize { - self.orderings.len() - } - - /// Extend this ordering equivalence class with the `other` class. - pub fn extend(&mut self, other: Self) { - self.orderings.extend(other.orderings); - // Make sure that there are no redundant orderings: - self.remove_redundant_entries(); - } - - /// Adds new orderings into this ordering equivalence class. - pub fn add_new_orderings( - &mut self, - orderings: impl IntoIterator, - ) { - self.orderings.extend(orderings); - // Make sure that there are no redundant orderings: - self.remove_redundant_entries(); - } - - /// Removes redundant orderings from this equivalence class. - /// For instance, If we already have the ordering [a ASC, b ASC, c DESC], - /// then there is no need to keep ordering [a ASC, b ASC] in the state. - fn remove_redundant_entries(&mut self) { - let mut idx = 0; - while idx < self.orderings.len() { - let mut removal = false; - for (ordering_idx, ordering) in self.orderings[0..idx].iter().enumerate() { - if let Some(right_finer) = finer_side(ordering, &self.orderings[idx]) { - if right_finer { - self.orderings.swap(ordering_idx, idx); - } - removal = true; - break; - } - } - if removal { - self.orderings.swap_remove(idx); - } else { - idx += 1; - } - } - } - - /// Returns the concatenation of all the orderings. This enables merge - /// operations to preserve all equivalent orderings simultaneously. - pub fn output_ordering(&self) -> Option { - let output_ordering = - self.orderings.iter().flatten().cloned().collect::>(); - let output_ordering = collapse_lex_ordering(output_ordering); - (!output_ordering.is_empty()).then_some(output_ordering) - } - - // Append orderings in `other` to all existing orderings in this equivalence - // class. - pub fn join_suffix(mut self, other: &Self) -> Self { - for ordering in other.iter() { - for idx in 0..self.orderings.len() { - self.orderings[idx].extend(ordering.iter().cloned()); - } - } - self - } - - /// Adds `offset` value to the index of each expression inside this - /// ordering equivalence class. - pub fn add_offset(&mut self, offset: usize) { - for ordering in self.orderings.iter_mut() { - for sort_expr in ordering { - sort_expr.expr = add_offset_to_expr(sort_expr.expr.clone(), offset); - } - } - } - - /// Gets sort options associated with this expression if it is a leading - /// ordering expression. Otherwise, returns `None`. - fn get_options(&self, expr: &Arc) -> Option { - for ordering in self.iter() { - let leading_ordering = &ordering[0]; - if leading_ordering.expr.eq(expr) { - return Some(leading_ordering.options); - } - } - None - } -} - /// Adds the `offset` value to `Column` indices inside `expr`. This function is /// generally used during the update of the right table schema in join operations. pub fn add_offset_to_expr( @@ -741,484 +65,6 @@ pub fn add_offset_to_expr( // an `Ok` value. } -/// Returns `true` if the ordering `rhs` is strictly finer than the ordering `rhs`, -/// `false` if the ordering `lhs` is at least as fine as the ordering `lhs`, and -/// `None` otherwise (i.e. when given orderings are incomparable). -fn finer_side(lhs: LexOrderingRef, rhs: LexOrderingRef) -> Option { - let all_equal = lhs.iter().zip(rhs.iter()).all(|(lhs, rhs)| lhs.eq(rhs)); - all_equal.then_some(lhs.len() < rhs.len()) -} - -/// A `EquivalenceProperties` object stores useful information related to a schema. -/// Currently, it keeps track of: -/// - Equivalent expressions, e.g expressions that have same value. -/// - Valid sort expressions (orderings) for the schema. -/// - Constants expressions (e.g expressions that are known to have constant values). -/// -/// Consider table below: -/// -/// ```text -/// ┌-------┐ -/// | a | b | -/// |---|---| -/// | 1 | 9 | -/// | 2 | 8 | -/// | 3 | 7 | -/// | 5 | 5 | -/// └---┴---┘ -/// ``` -/// -/// where both `a ASC` and `b DESC` can describe the table ordering. With -/// `EquivalenceProperties`, we can keep track of these different valid sort -/// expressions and treat `a ASC` and `b DESC` on an equal footing. -/// -/// Similarly, consider the table below: -/// -/// ```text -/// ┌-------┐ -/// | a | b | -/// |---|---| -/// | 1 | 1 | -/// | 2 | 2 | -/// | 3 | 3 | -/// | 5 | 5 | -/// └---┴---┘ -/// ``` -/// -/// where columns `a` and `b` always have the same value. We keep track of such -/// equivalences inside this object. With this information, we can optimize -/// things like partitioning. For example, if the partition requirement is -/// `Hash(a)` and output partitioning is `Hash(b)`, then we can deduce that -/// the existing partitioning satisfies the requirement. -#[derive(Debug, Clone)] -pub struct EquivalenceProperties { - /// Collection of equivalence classes that store expressions with the same - /// value. - eq_group: EquivalenceGroup, - /// Equivalent sort expressions for this table. - oeq_class: OrderingEquivalenceClass, - /// Expressions whose values are constant throughout the table. - /// TODO: We do not need to track constants separately, they can be tracked - /// inside `eq_groups` as `Literal` expressions. - constants: Vec>, - /// Schema associated with this object. - schema: SchemaRef, -} - -impl EquivalenceProperties { - /// Creates an empty `EquivalenceProperties` object. - pub fn new(schema: SchemaRef) -> Self { - Self { - eq_group: EquivalenceGroup::empty(), - oeq_class: OrderingEquivalenceClass::empty(), - constants: vec![], - schema, - } - } - - /// Creates a new `EquivalenceProperties` object with the given orderings. - pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self { - Self { - eq_group: EquivalenceGroup::empty(), - oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()), - constants: vec![], - schema, - } - } - - /// Returns the associated schema. - pub fn schema(&self) -> &SchemaRef { - &self.schema - } - - /// Returns a reference to the ordering equivalence class within. - pub fn oeq_class(&self) -> &OrderingEquivalenceClass { - &self.oeq_class - } - - /// Returns a reference to the equivalence group within. - pub fn eq_group(&self) -> &EquivalenceGroup { - &self.eq_group - } - - /// Returns a reference to the constant expressions - pub fn constants(&self) -> &[Arc] { - &self.constants - } - - /// Returns the normalized version of the ordering equivalence class within. - /// Normalization removes constants and duplicates as well as standardizing - /// expressions according to the equivalence group within. - pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass { - OrderingEquivalenceClass::new( - self.oeq_class - .iter() - .map(|ordering| self.normalize_sort_exprs(ordering)) - .collect(), - ) - } - - /// Extends this `EquivalenceProperties` with the `other` object. - pub fn extend(mut self, other: Self) -> Self { - self.eq_group.extend(other.eq_group); - self.oeq_class.extend(other.oeq_class); - self.add_constants(other.constants) - } - - /// Clears (empties) the ordering equivalence class within this object. - /// Call this method when existing orderings are invalidated. - pub fn clear_orderings(&mut self) { - self.oeq_class.clear(); - } - - /// Extends this `EquivalenceProperties` by adding the orderings inside the - /// ordering equivalence class `other`. - pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) { - self.oeq_class.extend(other); - } - - /// Adds new orderings into the existing ordering equivalence class. - pub fn add_new_orderings( - &mut self, - orderings: impl IntoIterator, - ) { - self.oeq_class.add_new_orderings(orderings); - } - - /// Incorporates the given equivalence group to into the existing - /// equivalence group within. - pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) { - self.eq_group.extend(other_eq_group); - } - - /// Adds a new equality condition into the existing equivalence group. - /// If the given equality defines a new equivalence class, adds this new - /// equivalence class to the equivalence group. - pub fn add_equal_conditions( - &mut self, - left: &Arc, - right: &Arc, - ) { - self.eq_group.add_equal_conditions(left, right); - } - - /// Track/register physical expressions with constant values. - pub fn add_constants( - mut self, - constants: impl IntoIterator>, - ) -> Self { - for expr in self.eq_group.normalize_exprs(constants) { - if !physical_exprs_contains(&self.constants, &expr) { - self.constants.push(expr); - } - } - self - } - - /// Updates the ordering equivalence group within assuming that the table - /// is re-sorted according to the argument `sort_exprs`. Note that constants - /// and equivalence classes are unchanged as they are unaffected by a re-sort. - pub fn with_reorder(mut self, sort_exprs: Vec) -> Self { - // TODO: In some cases, existing ordering equivalences may still be valid add this analysis. - self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]); - self - } - - /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the - /// equivalence group and the ordering equivalence class within. - /// - /// Assume that `self.eq_group` states column `a` and `b` are aliases. - /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC` - /// are equivalent (in the sense that both describe the ordering of the table). - /// If the `sort_exprs` argument were `vec![b ASC, c ASC, a ASC]`, then this - /// function would return `vec![a ASC, c ASC]`. Internally, it would first - /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result - /// after deduplication. - fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { - // Convert sort expressions to sort requirements: - let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); - // Normalize the requirements: - let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); - // Convert sort requirements back to sort expressions: - PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs) - } - - /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the - /// equivalence group and the ordering equivalence class within. It works by: - /// - Removing expressions that have a constant value from the given requirement. - /// - Replacing sections that belong to some equivalence class in the equivalence - /// group with the first entry in the matching equivalence class. - /// - /// Assume that `self.eq_group` states column `a` and `b` are aliases. - /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC` - /// are equivalent (in the sense that both describe the ordering of the table). - /// If the `sort_reqs` argument were `vec![b ASC, c ASC, a ASC]`, then this - /// function would return `vec![a ASC, c ASC]`. Internally, it would first - /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result - /// after deduplication. - fn normalize_sort_requirements( - &self, - sort_reqs: LexRequirementRef, - ) -> LexRequirement { - let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs); - let constants_normalized = self.eq_group.normalize_exprs(self.constants.clone()); - // Prune redundant sections in the requirement: - collapse_lex_req( - normalized_sort_reqs - .iter() - .filter(|&order| { - !physical_exprs_contains(&constants_normalized, &order.expr) - }) - .cloned() - .collect(), - ) - } - - /// Checks whether the given ordering is satisfied by any of the existing - /// orderings. - pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool { - // Convert the given sort expressions to sort requirements: - let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter()); - self.ordering_satisfy_requirement(&sort_requirements) - } - - /// Checks whether the given sort requirements are satisfied by any of the - /// existing orderings. - pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool { - // First, standardize the given requirement: - let normalized_reqs = self.normalize_sort_requirements(reqs); - if normalized_reqs.is_empty() { - // Requirements are tautologically satisfied if empty. - return true; - } - let mut indices = HashSet::new(); - for ordering in self.normalized_oeq_class().iter() { - let match_indices = ordering - .iter() - .map(|sort_expr| { - normalized_reqs - .iter() - .position(|sort_req| sort_expr.satisfy(sort_req, &self.schema)) - }) - .collect::>(); - // Find the largest contiguous increasing sequence starting from the first index: - if let Some(&Some(first)) = match_indices.first() { - indices.insert(first); - let mut iter = match_indices.windows(2); - while let Some([Some(current), Some(next)]) = iter.next() { - if next > current { - indices.insert(*next); - } else { - break; - } - } - } - } - indices.len() == normalized_reqs.len() - } - - /// Checks whether the `given`` sort requirements are equal or more specific - /// than the `reference` sort requirements. - pub fn requirements_compatible( - &self, - given: LexRequirementRef, - reference: LexRequirementRef, - ) -> bool { - let normalized_given = self.normalize_sort_requirements(given); - let normalized_reference = self.normalize_sort_requirements(reference); - - (normalized_reference.len() <= normalized_given.len()) - && normalized_reference - .into_iter() - .zip(normalized_given) - .all(|(reference, given)| given.compatible(&reference)) - } - - /// Returns the finer ordering among the orderings `lhs` and `rhs`, breaking - /// any ties by choosing `lhs`. - /// - /// The finer ordering is the ordering that satisfies both of the orderings. - /// If the orderings are incomparable, returns `None`. - /// - /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is - /// the latter. - pub fn get_finer_ordering( - &self, - lhs: LexOrderingRef, - rhs: LexOrderingRef, - ) -> Option { - // Convert the given sort expressions to sort requirements: - let lhs = PhysicalSortRequirement::from_sort_exprs(lhs); - let rhs = PhysicalSortRequirement::from_sort_exprs(rhs); - let finer = self.get_finer_requirement(&lhs, &rhs); - // Convert the chosen sort requirements back to sort expressions: - finer.map(PhysicalSortRequirement::to_sort_exprs) - } - - /// Returns the finer ordering among the requirements `lhs` and `rhs`, - /// breaking any ties by choosing `lhs`. - /// - /// The finer requirements are the ones that satisfy both of the given - /// requirements. If the requirements are incomparable, returns `None`. - /// - /// For example, the finer requirements among `[a ASC]` and `[a ASC, b ASC]` - /// is the latter. - pub fn get_finer_requirement( - &self, - req1: LexRequirementRef, - req2: LexRequirementRef, - ) -> Option { - let mut lhs = self.normalize_sort_requirements(req1); - let mut rhs = self.normalize_sort_requirements(req2); - lhs.iter_mut() - .zip(rhs.iter_mut()) - .all(|(lhs, rhs)| { - lhs.expr.eq(&rhs.expr) - && match (lhs.options, rhs.options) { - (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt, - (Some(options), None) => { - rhs.options = Some(options); - true - } - (None, Some(options)) => { - lhs.options = Some(options); - true - } - (None, None) => true, - } - }) - .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs }) - } - - /// Calculates the "meet" of the given orderings (`lhs` and `rhs`). - /// The meet of a set of orderings is the finest ordering that is satisfied - /// by all the orderings in that set. For details, see: - /// - /// - /// - /// If there is no ordering that satisfies both `lhs` and `rhs`, returns - /// `None`. As an example, the meet of orderings `[a ASC]` and `[a ASC, b ASC]` - /// is `[a ASC]`. - pub fn get_meet_ordering( - &self, - lhs: LexOrderingRef, - rhs: LexOrderingRef, - ) -> Option { - let lhs = self.normalize_sort_exprs(lhs); - let rhs = self.normalize_sort_exprs(rhs); - let mut meet = vec![]; - for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) { - if lhs.eq(&rhs) { - meet.push(lhs); - } else { - break; - } - } - (!meet.is_empty()).then_some(meet) - } - - /// Projects argument `expr` according to `projection_mapping`, taking - /// equivalences into account. - /// - /// For example, assume that columns `a` and `c` are always equal, and that - /// `projection_mapping` encodes following mapping: - /// - /// ```text - /// a -> a1 - /// b -> b1 - /// ``` - /// - /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to - /// `Some(a1 + b1)` and `d` to `None`, meaning that it cannot be projected. - pub fn project_expr( - &self, - expr: &Arc, - projection_mapping: &ProjectionMapping, - ) -> Option> { - self.eq_group.project_expr(projection_mapping, expr) - } - - /// Projects the equivalences within according to `projection_mapping` - /// and `output_schema`. - pub fn project( - &self, - projection_mapping: &ProjectionMapping, - output_schema: SchemaRef, - ) -> Self { - let mut projected_orderings = self - .oeq_class - .iter() - .filter_map(|order| self.eq_group.project_ordering(projection_mapping, order)) - .collect::>(); - for (source, target) in projection_mapping.iter() { - let expr_ordering = ExprOrdering::new(source.clone()) - .transform_up(&|expr| update_ordering(expr, self)) - .unwrap(); - if let SortProperties::Ordered(options) = expr_ordering.state { - // Push new ordering to the state. - projected_orderings.push(vec![PhysicalSortExpr { - expr: target.clone(), - options, - }]); - } - } - Self { - eq_group: self.eq_group.project(projection_mapping), - oeq_class: OrderingEquivalenceClass::new(projected_orderings), - constants: vec![], - schema: output_schema, - } - } - - /// Returns the longest (potentially partial) permutation satisfying the - /// existing ordering. For example, if we have the equivalent orderings - /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`, - /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`. - /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied - /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0` - /// inside the argument `exprs` (respectively). For the mathematical - /// definition of "partial permutation", see: - /// - /// - pub fn find_longest_permutation( - &self, - exprs: &[Arc], - ) -> (LexOrdering, Vec) { - let normalized_exprs = self.eq_group.normalize_exprs(exprs.to_vec()); - // Use a map to associate expression indices with sort options: - let mut ordered_exprs = IndexMap::::new(); - for ordering in self.normalized_oeq_class().iter() { - for sort_expr in ordering { - if let Some(idx) = normalized_exprs - .iter() - .position(|expr| sort_expr.expr.eq(expr)) - { - if let Entry::Vacant(e) = ordered_exprs.entry(idx) { - e.insert(sort_expr.options); - } - } else { - // We only consider expressions that correspond to a prefix - // of one of the equivalent orderings we have. - break; - } - } - } - // Construct the lexicographical ordering according to the permutation: - ordered_exprs - .into_iter() - .map(|(idx, options)| { - ( - PhysicalSortExpr { - expr: exprs[idx].clone(), - options, - }, - idx, - ) - }) - .unzip() - } -} - /// Calculate ordering equivalence properties for the given join operation. pub fn join_equivalence_properties( left: EquivalenceProperties, @@ -1229,7 +75,7 @@ pub fn join_equivalence_properties( probe_side: Option, on: &[(Column, Column)], ) -> EquivalenceProperties { - let left_size = left.schema.fields.len(); + let left_size = left.schema().fields.len(); let mut result = EquivalenceProperties::new(join_schema); result.add_equivalence_group(left.eq_group().join( right.eq_group(), @@ -1238,8 +84,8 @@ pub fn join_equivalence_properties( on, )); - let left_oeq_class = left.oeq_class; - let mut right_oeq_class = right.oeq_class; + let left_oeq_class = left.into_oeq_class(); + let mut right_oeq_class = right.into_oeq_class(); match maintains_input_order { [true, false] => { // In this special case, right side ordering can be prefixed with @@ -1315,58 +161,20 @@ fn updated_right_ordering_equivalence_class( } } -/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. -/// The node can either be a leaf node, or an intermediate node: -/// - If it is a leaf node, we directly find the order of the node by looking -/// at the given sort expression and equivalence properties if it is a `Column` -/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark -/// it as singleton so that it can cooperate with all ordered columns. -/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr` -/// and operator has its own rules on how to propagate the children orderings. -/// However, before we engage in recursion, we check whether this intermediate -/// node directly matches with the sort expression. If there is a match, the -/// sort expression emerges at that node immediately, discarding the recursive -/// result coming from its children. -fn update_ordering( - mut node: ExprOrdering, - eq_properties: &EquivalenceProperties, -) -> Result> { - if !node.expr.children().is_empty() { - // We have an intermediate (non-leaf) node, account for its children: - node.state = node.expr.get_ordering(&node.children_states); - Ok(Transformed::Yes(node)) - } else if node.expr.as_any().is::() { - // We have a Column, which is one of the two possible leaf node types: - let eq_group = &eq_properties.eq_group; - let normalized_expr = eq_group.normalize_expr(node.expr.clone()); - let oeq_class = &eq_properties.oeq_class; - if let Some(options) = oeq_class.get_options(&normalized_expr) { - node.state = SortProperties::Ordered(options); - Ok(Transformed::Yes(node)) - } else { - Ok(Transformed::No(node)) - } - } else { - // We have a Literal, which is the other possible leaf node type: - node.state = node.expr.get_ordering(&[]); - Ok(Transformed::Yes(node)) - } -} - #[cfg(test)] mod tests { use std::ops::Not; use std::sync::Arc; use super::*; - use crate::expressions::{col, lit, BinaryExpr, Column, Literal}; + use crate::expressions::{col, lit, Column, Literal}; + use crate::PhysicalSortExpr; use arrow::compute::{lexsort_to_indices, SortColumn}; use arrow::datatypes::{DataType, Field, Schema}; use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array}; use arrow_schema::{Fields, SortOptions}; use datafusion_common::{Result, ScalarValue}; - use datafusion_expr::Operator; use itertools::{izip, Itertools}; use rand::rngs::StdRng; @@ -1550,7 +358,7 @@ mod tests { // This new entry is redundant, size shouldn't increase eq_properties.add_equal_conditions(&col_b_expr, &col_a_expr); assert_eq!(eq_properties.eq_group().len(), 1); - let eq_groups = &eq_properties.eq_group().classes[0]; + let eq_groups = eq_properties.eq_group().iter().next().unwrap(); assert_eq!(eq_groups.len(), 2); assert!(eq_groups.contains(&col_a_expr)); assert!(eq_groups.contains(&col_b_expr)); @@ -1559,7 +367,7 @@ mod tests { // however there shouldn't be any new equivalence class eq_properties.add_equal_conditions(&col_b_expr, &col_c_expr); assert_eq!(eq_properties.eq_group().len(), 1); - let eq_groups = &eq_properties.eq_group().classes[0]; + let eq_groups = eq_properties.eq_group().iter().next().unwrap(); assert_eq!(eq_groups.len(), 3); assert!(eq_groups.contains(&col_a_expr)); assert!(eq_groups.contains(&col_b_expr)); @@ -1573,13 +381,13 @@ mod tests { // Hence equivalent class count should decrease from 2 to 1. eq_properties.add_equal_conditions(&col_x_expr, &col_a_expr); assert_eq!(eq_properties.eq_group().len(), 1); - let eq_groups = &eq_properties.eq_group().classes[0]; - assert_eq!(eq_groups.len(), 5); - assert!(eq_groups.contains(&col_a_expr)); - assert!(eq_groups.contains(&col_b_expr)); - assert!(eq_groups.contains(&col_c_expr)); - assert!(eq_groups.contains(&col_x_expr)); - assert!(eq_groups.contains(&col_y_expr)); + let eq_class = &eq_properties.eq_group().iter().next().unwrap(); + assert_eq!(eq_class.len(), 5); + assert!(eq_class.contains(&col_a_expr)); + assert!(eq_class.contains(&col_b_expr)); + assert!(eq_class.contains(&col_c_expr)); + assert!(eq_class.contains(&col_x_expr)); + assert!(eq_class.contains(&col_y_expr)); Ok(()) } @@ -1607,19 +415,21 @@ mod tests { let col_a2 = &col("a2", &out_schema)?; let col_a3 = &col("a3", &out_schema)?; let col_a4 = &col("a4", &out_schema)?; - let projection_mapping = ProjectionMapping { - inner: vec![ - (col_a.clone(), col_a1.clone()), - (col_a.clone(), col_a2.clone()), - (col_a.clone(), col_a3.clone()), - (col_a.clone(), col_a4.clone()), + let projection_mapping = ProjectionMapping::try_new( + &[ + (col_a.clone(), "a1".into()), + (col_a.clone(), "a2".into()), + (col_a.clone(), "a3".into()), + (col_a.clone(), "a4".into()), ], - }; + &input_schema, + ) + .unwrap(); let out_properties = input_properties.project(&projection_mapping, out_schema); // At the output a1=a2=a3=a4 assert_eq!(out_properties.eq_group().len(), 1); - let eq_class = &out_properties.eq_group().classes[0]; + let eq_class = &out_properties.eq_group().iter().next().unwrap(); assert_eq!(eq_class.len(), 4); assert!(eq_class.contains(col_a1)); assert!(eq_class.contains(col_a2)); @@ -1973,14 +783,14 @@ mod tests { .collect::>(); let mut eq_groups = EquivalenceGroup::new(entries.clone()); eq_groups.bridge_classes(); - let eq_groups = eq_groups.classes; + let eq_classes = eq_groups.into_vec(); let err_msg = format!( "error in test entries: {:?}, expected: {:?}, actual:{:?}", - entries, expected, eq_groups + entries, expected, eq_classes ); - assert_eq!(eq_groups.len(), expected.len(), "{}", err_msg); - for idx in 0..eq_groups.len() { - assert_eq!(&eq_groups[idx], &expected[idx], "{}", err_msg); + assert_eq!(eq_classes.len(), expected.len(), "{}", err_msg); + for idx in 0..eq_classes.len() { + assert_eq!(&eq_classes[idx], &expected[idx], "{}", err_msg); } } Ok(()) @@ -2003,7 +813,7 @@ mod tests { let mut eq_groups = EquivalenceGroup::new(entries); eq_groups.remove_redundant_entries(); - let eq_groups = eq_groups.classes; + let eq_groups = eq_groups.into_vec(); assert_eq!(eq_groups.len(), expected.len()); assert_eq!(eq_groups.len(), 2); @@ -2116,7 +926,7 @@ mod tests { let orderings = convert_to_orderings(&orderings); let expected = convert_to_orderings(&expected); let actual = OrderingEquivalenceClass::new(orderings.clone()); - let actual = actual.orderings; + let actual = actual.into_vec(); let err_msg = format!( "orderings: {:?}, expected: {:?}, actual :{:?}", orderings, expected, actual @@ -2299,7 +1109,7 @@ mod tests { }; // Fill constant columns - for constant in &eq_properties.constants { + for constant in eq_properties.constants() { let col = constant.as_any().downcast_ref::().unwrap(); let (idx, _field) = schema.column_with_name(col.name()).unwrap(); let arr = @@ -2332,7 +1142,7 @@ mod tests { } // Fill columns based on equivalence groups - for eq_group in eq_properties.eq_group.iter() { + for eq_group in eq_properties.eq_group().iter() { let representative_array = get_representative_arr(eq_group, &schema_vec, schema.clone()) .unwrap_or_else(|| generate_random_array(n_elem, n_distinct)); @@ -2669,76 +1479,6 @@ mod tests { Ok(()) } - #[test] - fn test_update_ordering() -> Result<()> { - let schema = Schema::new(vec![ - Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Int32, true), - Field::new("c", DataType::Int32, true), - Field::new("d", DataType::Int32, true), - ]); - - let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); - let col_a = &col("a", &schema)?; - let col_b = &col("b", &schema)?; - let col_c = &col("c", &schema)?; - let col_d = &col("d", &schema)?; - let option_asc = SortOptions { - descending: false, - nulls_first: false, - }; - // b=a (e.g they are aliases) - eq_properties.add_equal_conditions(col_b, col_a); - // [b ASC], [d ASC] - eq_properties.add_new_orderings(vec![ - vec![PhysicalSortExpr { - expr: col_b.clone(), - options: option_asc, - }], - vec![PhysicalSortExpr { - expr: col_d.clone(), - options: option_asc, - }], - ]); - - let test_cases = vec![ - // d + b - ( - Arc::new(BinaryExpr::new( - col_d.clone(), - Operator::Plus, - col_b.clone(), - )) as Arc, - SortProperties::Ordered(option_asc), - ), - // b - (col_b.clone(), SortProperties::Ordered(option_asc)), - // a - (col_a.clone(), SortProperties::Ordered(option_asc)), - // a + c - ( - Arc::new(BinaryExpr::new( - col_a.clone(), - Operator::Plus, - col_c.clone(), - )), - SortProperties::Unordered, - ), - ]; - for (expr, expected) in test_cases { - let expr_ordering = ExprOrdering::new(expr.clone()); - let expr_ordering = expr_ordering - .transform_up(&|expr| update_ordering(expr, &eq_properties))?; - let err_msg = format!( - "expr:{:?}, expected: {:?}, actual: {:?}", - expr, expected, expr_ordering.state - ); - assert_eq!(expr_ordering.state, expected, "{}", err_msg); - } - - Ok(()) - } - #[test] fn test_contains_any() { let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) @@ -2927,24 +1667,21 @@ mod tests { Field::new("b", DataType::Int32, true), Field::new("c", DataType::Int32, true), ]); - let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); + let schema = Arc::new(schema.clone()); + let mut eq_properties = EquivalenceProperties::new(schema.clone()); let ordering = vec![PhysicalSortExpr { expr: Arc::new(Column::new("b", 1)), options: SortOptions::default(), }]; eq_properties.add_new_orderings([ordering]); - let projection_mapping = ProjectionMapping { - inner: vec![ - ( - Arc::new(Column::new("b", 1)) as _, - Arc::new(Column::new("b_new", 0)) as _, - ), - ( - Arc::new(Column::new("a", 0)) as _, - Arc::new(Column::new("a_new", 1)) as _, - ), + let projection_mapping = ProjectionMapping::try_new( + &[ + (Arc::new(Column::new("b", 1)) as _, "b_new".into()), + (Arc::new(Column::new("a", 0)) as _, "a_new".into()), ], - }; + &schema, + ) + .unwrap(); let projection_schema = Arc::new(Schema::new(vec![ Field::new("b_new", DataType::Int32, true), Field::new("a_new", DataType::Int32, true), @@ -2968,19 +1705,16 @@ mod tests { Field::new("b", DataType::Int32, true), Field::new("c", DataType::Int32, true), ]); - let eq_properties = EquivalenceProperties::new(Arc::new(schema)); - let projection_mapping = ProjectionMapping { - inner: vec![ - ( - Arc::new(Column::new("c", 2)) as _, - Arc::new(Column::new("c_new", 0)) as _, - ), - ( - Arc::new(Column::new("b", 1)) as _, - Arc::new(Column::new("b_new", 1)) as _, - ), + let schema = Arc::new(schema); + let eq_properties = EquivalenceProperties::new(schema.clone()); + let projection_mapping = ProjectionMapping::try_new( + &[ + (Arc::new(Column::new("c", 2)) as _, "c_new".into()), + (Arc::new(Column::new("b", 1)) as _, "b_new".into()), ], - }; + &schema, + ) + .unwrap(); let projection_schema = Arc::new(Schema::new(vec![ Field::new("c_new", DataType::Int32, true), Field::new("b_new", DataType::Int32, true), diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs new file mode 100644 index 000000000000..72637f3636d6 --- /dev/null +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use arrow_schema::SortOptions; +use std::hash::Hash; +use std::sync::Arc; + +use crate::equivalence::add_offset_to_expr; +use crate::{LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalSortExpr}; + +/// An `OrderingEquivalenceClass` object keeps track of different alternative +/// orderings than can describe a schema. For example, consider the following table: +/// +/// ```text +/// |a|b|c|d| +/// |1|4|3|1| +/// |2|3|3|2| +/// |3|1|2|2| +/// |3|2|1|3| +/// ``` +/// +/// Here, both `vec![a ASC, b ASC]` and `vec![c DESC, d ASC]` describe the table +/// ordering. In this case, we say that these orderings are equivalent. +#[derive(Debug, Clone, Eq, PartialEq, Hash)] +pub struct OrderingEquivalenceClass { + orderings: Vec, +} + +impl OrderingEquivalenceClass { + /// Creates new empty ordering equivalence class. + pub fn empty() -> Self { + Self { orderings: vec![] } + } + + /// Clears (empties) this ordering equivalence class. + pub fn clear(&mut self) { + self.orderings.clear(); + } + + /// Creates new ordering equivalence class from the given orderings. + pub fn new(orderings: Vec) -> Self { + let mut result = Self { orderings }; + result.remove_redundant_entries(); + result + } + + /// Checks whether `ordering` is a member of this equivalence class. + pub fn contains(&self, ordering: &LexOrdering) -> bool { + self.orderings.contains(ordering) + } + + /// Returns the underlying vector of orderings. + pub fn into_vec(self) -> Vec { + self.orderings + } + + /// Adds `ordering` to this equivalence class. + #[cfg(test)] + pub fn push(&mut self, ordering: LexOrdering) { + self.orderings.push(ordering); + // Make sure that there are no redundant orderings: + self.remove_redundant_entries(); + } + + /// Checks whether this ordering equivalence class is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns an iterator over the equivalent orderings in this class. + pub fn iter(&self) -> impl Iterator { + self.orderings.iter() + } + + /// Returns how many equivalent orderings there are in this class. + pub fn len(&self) -> usize { + self.orderings.len() + } + + /// Extend this ordering equivalence class with the `other` class. + pub fn extend(&mut self, other: Self) { + self.orderings.extend(other.orderings); + // Make sure that there are no redundant orderings: + self.remove_redundant_entries(); + } + + /// Adds new orderings into this ordering equivalence class. + pub fn add_new_orderings( + &mut self, + orderings: impl IntoIterator, + ) { + self.orderings.extend(orderings); + // Make sure that there are no redundant orderings: + self.remove_redundant_entries(); + } + + /// Removes redundant orderings from this equivalence class. + /// For instance, If we already have the ordering [a ASC, b ASC, c DESC], + /// then there is no need to keep ordering [a ASC, b ASC] in the state. + fn remove_redundant_entries(&mut self) { + let mut idx = 0; + while idx < self.orderings.len() { + let mut removal = false; + for (ordering_idx, ordering) in self.orderings[0..idx].iter().enumerate() { + if let Some(right_finer) = finer_side(ordering, &self.orderings[idx]) { + if right_finer { + self.orderings.swap(ordering_idx, idx); + } + removal = true; + break; + } + } + if removal { + self.orderings.swap_remove(idx); + } else { + idx += 1; + } + } + } + + /// Returns the concatenation of all the orderings. This enables merge + /// operations to preserve all equivalent orderings simultaneously. + pub fn output_ordering(&self) -> Option { + let output_ordering = + self.orderings.iter().flatten().cloned().collect::>(); + let output_ordering = collapse_lex_ordering(output_ordering); + (!output_ordering.is_empty()).then_some(output_ordering) + } + + // Append orderings in `other` to all existing orderings in this equivalence + // class. + pub fn join_suffix(mut self, other: &Self) -> Self { + for ordering in other.iter() { + for idx in 0..self.orderings.len() { + self.orderings[idx].extend(ordering.iter().cloned()); + } + } + self + } + + /// Adds `offset` value to the index of each expression inside this + /// ordering equivalence class. + pub fn add_offset(&mut self, offset: usize) { + for ordering in self.orderings.iter_mut() { + for sort_expr in ordering { + sort_expr.expr = add_offset_to_expr(sort_expr.expr.clone(), offset); + } + } + } + + /// Gets sort options associated with this expression if it is a leading + /// ordering expression. Otherwise, returns `None`. + pub fn get_options(&self, expr: &Arc) -> Option { + for ordering in self.iter() { + let leading_ordering = &ordering[0]; + if leading_ordering.expr.eq(expr) { + return Some(leading_ordering.options); + } + } + None + } +} + +/// Returns `true` if the ordering `rhs` is strictly finer than the ordering `rhs`, +/// `false` if the ordering `lhs` is at least as fine as the ordering `lhs`, and +/// `None` otherwise (i.e. when given orderings are incomparable). +fn finer_side(lhs: LexOrderingRef, rhs: LexOrderingRef) -> Option { + let all_equal = lhs.iter().zip(rhs.iter()).all(|(lhs, rhs)| lhs.eq(rhs)); + all_equal.then_some(lhs.len() < rhs.len()) +} + +/// This function constructs a duplicate-free `LexOrdering` by filtering out +/// duplicate entries that have same physical expression inside. For example, +/// `vec![a ASC, a DESC]` collapses to `vec![a ASC]`. +fn collapse_lex_ordering(input: LexOrdering) -> LexOrdering { + let mut output = Vec::::new(); + for item in input { + if !output.iter().any(|req| req.expr.eq(&item.expr)) { + output.push(item); + } + } + output +} diff --git a/datafusion/physical-expr/src/equivalence/projection.rs b/datafusion/physical-expr/src/equivalence/projection.rs new file mode 100644 index 000000000000..1aad9f816267 --- /dev/null +++ b/datafusion/physical-expr/src/equivalence/projection.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use crate::expressions::Column; +use crate::PhysicalExpr; + +use arrow::datatypes::SchemaRef; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::Result; + +/// Stores the mapping between source expressions and target expressions for a +/// projection. +#[derive(Debug, Clone)] +pub struct ProjectionMapping { + /// `(source expression)` --> `(target expression)` + /// Indices in the vector corresponds to the indices after projection. + inner: Vec<(Arc, Arc)>, +} + +impl ProjectionMapping { + /// Constructs the mapping between a projection's input and output + /// expressions. + /// + /// For example, given the input projection expressions (`a+b`, `c+d`) + /// and an output schema with two columns `"c+d"` and `"a+b"` + /// the projection mapping would be + /// ```text + /// [0]: (c+d, col("c+d")) + /// [1]: (a+b, col("a+b")) + /// ``` + /// where `col("c+d")` means the column named "c+d". + pub fn try_new( + expr: &[(Arc, String)], + input_schema: &SchemaRef, + ) -> Result { + // Construct a map from the input expressions to the output expression of the projection: + let mut inner = vec![]; + for (expr_idx, (expression, name)) in expr.iter().enumerate() { + let target_expr = Arc::new(Column::new(name, expr_idx)) as _; + + let source_expr = expression.clone().transform_down(&|e| match e + .as_any() + .downcast_ref::( + ) { + Some(col) => { + // Sometimes, expression and its name in the input_schema doesn't match. + // This can cause problems. Hence in here we make sure that expression name + // matches with the name in the inout_schema. + // Conceptually, source_expr and expression should be same. + let idx = col.index(); + let matching_input_field = input_schema.field(idx); + let matching_input_column = + Column::new(matching_input_field.name(), idx); + Ok(Transformed::Yes(Arc::new(matching_input_column))) + } + None => Ok(Transformed::No(e)), + })?; + + inner.push((source_expr, target_expr)); + } + Ok(Self { inner }) + } + + /// Iterate over pairs of (source, target) expressions + pub fn iter( + &self, + ) -> impl Iterator, Arc)> + '_ { + self.inner.iter() + } +} diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs new file mode 100644 index 000000000000..59eab6abe3db --- /dev/null +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -0,0 +1,625 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::{SchemaRef, SortOptions}; +use indexmap::map::Entry; +use indexmap::IndexMap; +use std::collections::HashSet; +use std::sync::Arc; + +use crate::equivalence::{ + collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, ProjectionMapping, +}; +use crate::expressions::Column; +use crate::sort_properties::{ExprOrdering, SortProperties}; +use crate::{ + physical_exprs_contains, LexOrdering, LexOrderingRef, LexRequirement, + LexRequirementRef, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement, +}; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::Result; + +/// A `EquivalenceProperties` object stores useful information related to a schema. +/// Currently, it keeps track of: +/// - Equivalent expressions, e.g expressions that have same value. +/// - Valid sort expressions (orderings) for the schema. +/// - Constants expressions (e.g expressions that are known to have constant values). +/// +/// Consider table below: +/// +/// ```text +/// ┌-------┐ +/// | a | b | +/// |---|---| +/// | 1 | 9 | +/// | 2 | 8 | +/// | 3 | 7 | +/// | 5 | 5 | +/// └---┴---┘ +/// ``` +/// +/// where both `a ASC` and `b DESC` can describe the table ordering. With +/// `EquivalenceProperties`, we can keep track of these different valid sort +/// expressions and treat `a ASC` and `b DESC` on an equal footing. +/// +/// Similarly, consider the table below: +/// +/// ```text +/// ┌-------┐ +/// | a | b | +/// |---|---| +/// | 1 | 1 | +/// | 2 | 2 | +/// | 3 | 3 | +/// | 5 | 5 | +/// └---┴---┘ +/// ``` +/// +/// where columns `a` and `b` always have the same value. We keep track of such +/// equivalences inside this object. With this information, we can optimize +/// things like partitioning. For example, if the partition requirement is +/// `Hash(a)` and output partitioning is `Hash(b)`, then we can deduce that +/// the existing partitioning satisfies the requirement. +#[derive(Debug, Clone)] +pub struct EquivalenceProperties { + /// Collection of equivalence classes that store expressions with the same + /// value. + eq_group: EquivalenceGroup, + /// Equivalent sort expressions for this table. + pub(crate) oeq_class: OrderingEquivalenceClass, + /// Expressions whose values are constant throughout the table. + /// TODO: We do not need to track constants separately, they can be tracked + /// inside `eq_groups` as `Literal` expressions. + constants: Vec>, + /// Schema associated with this object. + schema: SchemaRef, +} + +impl EquivalenceProperties { + /// Creates an empty `EquivalenceProperties` object. + pub fn new(schema: SchemaRef) -> Self { + Self { + eq_group: EquivalenceGroup::empty(), + oeq_class: OrderingEquivalenceClass::empty(), + constants: vec![], + schema, + } + } + + /// Creates a new `EquivalenceProperties` object with the given orderings. + pub fn new_with_orderings(schema: SchemaRef, orderings: &[LexOrdering]) -> Self { + Self { + eq_group: EquivalenceGroup::empty(), + oeq_class: OrderingEquivalenceClass::new(orderings.to_vec()), + constants: vec![], + schema, + } + } + + /// Returns the associated schema. + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// Returns a reference to the ordering equivalence class within. + pub fn oeq_class(&self) -> &OrderingEquivalenceClass { + &self.oeq_class + } + + /// consume self, and returns the ordering equivalence class within. + pub fn into_oeq_class(self) -> OrderingEquivalenceClass { + self.oeq_class + } + + /// Returns a reference to the equivalence group within. + pub fn eq_group(&self) -> &EquivalenceGroup { + &self.eq_group + } + + /// Returns a reference to the constant expressions + pub fn constants(&self) -> &[Arc] { + &self.constants + } + + /// Returns the normalized version of the ordering equivalence class within. + /// Normalization removes constants and duplicates as well as standardizing + /// expressions according to the equivalence group within. + pub fn normalized_oeq_class(&self) -> OrderingEquivalenceClass { + OrderingEquivalenceClass::new( + self.oeq_class + .iter() + .map(|ordering| self.normalize_sort_exprs(ordering)) + .collect(), + ) + } + + /// Extends this `EquivalenceProperties` with the `other` object. + pub fn extend(mut self, other: Self) -> Self { + self.eq_group.extend(other.eq_group); + self.oeq_class.extend(other.oeq_class); + self.add_constants(other.constants) + } + + /// Clears (empties) the ordering equivalence class within this object. + /// Call this method when existing orderings are invalidated. + pub fn clear_orderings(&mut self) { + self.oeq_class.clear(); + } + + /// Extends this `EquivalenceProperties` by adding the orderings inside the + /// ordering equivalence class `other`. + pub fn add_ordering_equivalence_class(&mut self, other: OrderingEquivalenceClass) { + self.oeq_class.extend(other); + } + + /// Adds new orderings into the existing ordering equivalence class. + pub fn add_new_orderings( + &mut self, + orderings: impl IntoIterator, + ) { + self.oeq_class.add_new_orderings(orderings); + } + + /// Incorporates the given equivalence group to into the existing + /// equivalence group within. + pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) { + self.eq_group.extend(other_eq_group); + } + + /// Adds a new equality condition into the existing equivalence group. + /// If the given equality defines a new equivalence class, adds this new + /// equivalence class to the equivalence group. + pub fn add_equal_conditions( + &mut self, + left: &Arc, + right: &Arc, + ) { + self.eq_group.add_equal_conditions(left, right); + } + + /// Track/register physical expressions with constant values. + pub fn add_constants( + mut self, + constants: impl IntoIterator>, + ) -> Self { + for expr in self.eq_group.normalize_exprs(constants) { + if !physical_exprs_contains(&self.constants, &expr) { + self.constants.push(expr); + } + } + self + } + + /// Updates the ordering equivalence group within assuming that the table + /// is re-sorted according to the argument `sort_exprs`. Note that constants + /// and equivalence classes are unchanged as they are unaffected by a re-sort. + pub fn with_reorder(mut self, sort_exprs: Vec) -> Self { + // TODO: In some cases, existing ordering equivalences may still be valid add this analysis. + self.oeq_class = OrderingEquivalenceClass::new(vec![sort_exprs]); + self + } + + /// Normalizes the given sort expressions (i.e. `sort_exprs`) using the + /// equivalence group and the ordering equivalence class within. + /// + /// Assume that `self.eq_group` states column `a` and `b` are aliases. + /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC` + /// are equivalent (in the sense that both describe the ordering of the table). + /// If the `sort_exprs` argument were `vec![b ASC, c ASC, a ASC]`, then this + /// function would return `vec![a ASC, c ASC]`. Internally, it would first + /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result + /// after deduplication. + fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering { + // Convert sort expressions to sort requirements: + let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()); + // Normalize the requirements: + let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs); + // Convert sort requirements back to sort expressions: + PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs) + } + + /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the + /// equivalence group and the ordering equivalence class within. It works by: + /// - Removing expressions that have a constant value from the given requirement. + /// - Replacing sections that belong to some equivalence class in the equivalence + /// group with the first entry in the matching equivalence class. + /// + /// Assume that `self.eq_group` states column `a` and `b` are aliases. + /// Also assume that `self.oeq_class` states orderings `d ASC` and `a ASC, c ASC` + /// are equivalent (in the sense that both describe the ordering of the table). + /// If the `sort_reqs` argument were `vec![b ASC, c ASC, a ASC]`, then this + /// function would return `vec![a ASC, c ASC]`. Internally, it would first + /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final result + /// after deduplication. + pub fn normalize_sort_requirements( + &self, + sort_reqs: LexRequirementRef, + ) -> LexRequirement { + let normalized_sort_reqs = self.eq_group.normalize_sort_requirements(sort_reqs); + let constants_normalized = self.eq_group.normalize_exprs(self.constants.clone()); + // Prune redundant sections in the requirement: + collapse_lex_req( + normalized_sort_reqs + .iter() + .filter(|&order| { + !physical_exprs_contains(&constants_normalized, &order.expr) + }) + .cloned() + .collect(), + ) + } + + /// Checks whether the given ordering is satisfied by any of the existing + /// orderings. + pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool { + // Convert the given sort expressions to sort requirements: + let sort_requirements = PhysicalSortRequirement::from_sort_exprs(given.iter()); + self.ordering_satisfy_requirement(&sort_requirements) + } + + /// Checks whether the given sort requirements are satisfied by any of the + /// existing orderings. + pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> bool { + // First, standardize the given requirement: + let normalized_reqs = self.normalize_sort_requirements(reqs); + if normalized_reqs.is_empty() { + // Requirements are tautologically satisfied if empty. + return true; + } + let mut indices = HashSet::new(); + for ordering in self.normalized_oeq_class().iter() { + let match_indices = ordering + .iter() + .map(|sort_expr| { + normalized_reqs + .iter() + .position(|sort_req| sort_expr.satisfy(sort_req, &self.schema)) + }) + .collect::>(); + // Find the largest contiguous increasing sequence starting from the first index: + if let Some(&Some(first)) = match_indices.first() { + indices.insert(first); + let mut iter = match_indices.windows(2); + while let Some([Some(current), Some(next)]) = iter.next() { + if next > current { + indices.insert(*next); + } else { + break; + } + } + } + } + indices.len() == normalized_reqs.len() + } + + /// Checks whether the `given`` sort requirements are equal or more specific + /// than the `reference` sort requirements. + pub fn requirements_compatible( + &self, + given: LexRequirementRef, + reference: LexRequirementRef, + ) -> bool { + let normalized_given = self.normalize_sort_requirements(given); + let normalized_reference = self.normalize_sort_requirements(reference); + + (normalized_reference.len() <= normalized_given.len()) + && normalized_reference + .into_iter() + .zip(normalized_given) + .all(|(reference, given)| given.compatible(&reference)) + } + + /// Returns the finer ordering among the orderings `lhs` and `rhs`, breaking + /// any ties by choosing `lhs`. + /// + /// The finer ordering is the ordering that satisfies both of the orderings. + /// If the orderings are incomparable, returns `None`. + /// + /// For example, the finer ordering among `[a ASC]` and `[a ASC, b ASC]` is + /// the latter. + pub fn get_finer_ordering( + &self, + lhs: LexOrderingRef, + rhs: LexOrderingRef, + ) -> Option { + // Convert the given sort expressions to sort requirements: + let lhs = PhysicalSortRequirement::from_sort_exprs(lhs); + let rhs = PhysicalSortRequirement::from_sort_exprs(rhs); + let finer = self.get_finer_requirement(&lhs, &rhs); + // Convert the chosen sort requirements back to sort expressions: + finer.map(PhysicalSortRequirement::to_sort_exprs) + } + + /// Returns the finer ordering among the requirements `lhs` and `rhs`, + /// breaking any ties by choosing `lhs`. + /// + /// The finer requirements are the ones that satisfy both of the given + /// requirements. If the requirements are incomparable, returns `None`. + /// + /// For example, the finer requirements among `[a ASC]` and `[a ASC, b ASC]` + /// is the latter. + pub fn get_finer_requirement( + &self, + req1: LexRequirementRef, + req2: LexRequirementRef, + ) -> Option { + let mut lhs = self.normalize_sort_requirements(req1); + let mut rhs = self.normalize_sort_requirements(req2); + lhs.iter_mut() + .zip(rhs.iter_mut()) + .all(|(lhs, rhs)| { + lhs.expr.eq(&rhs.expr) + && match (lhs.options, rhs.options) { + (Some(lhs_opt), Some(rhs_opt)) => lhs_opt == rhs_opt, + (Some(options), None) => { + rhs.options = Some(options); + true + } + (None, Some(options)) => { + lhs.options = Some(options); + true + } + (None, None) => true, + } + }) + .then_some(if lhs.len() >= rhs.len() { lhs } else { rhs }) + } + + /// Calculates the "meet" of the given orderings (`lhs` and `rhs`). + /// The meet of a set of orderings is the finest ordering that is satisfied + /// by all the orderings in that set. For details, see: + /// + /// + /// + /// If there is no ordering that satisfies both `lhs` and `rhs`, returns + /// `None`. As an example, the meet of orderings `[a ASC]` and `[a ASC, b ASC]` + /// is `[a ASC]`. + pub fn get_meet_ordering( + &self, + lhs: LexOrderingRef, + rhs: LexOrderingRef, + ) -> Option { + let lhs = self.normalize_sort_exprs(lhs); + let rhs = self.normalize_sort_exprs(rhs); + let mut meet = vec![]; + for (lhs, rhs) in lhs.into_iter().zip(rhs.into_iter()) { + if lhs.eq(&rhs) { + meet.push(lhs); + } else { + break; + } + } + (!meet.is_empty()).then_some(meet) + } + + /// Projects argument `expr` according to `projection_mapping`, taking + /// equivalences into account. + /// + /// For example, assume that columns `a` and `c` are always equal, and that + /// `projection_mapping` encodes following mapping: + /// + /// ```text + /// a -> a1 + /// b -> b1 + /// ``` + /// + /// Then, this function projects `a + b` to `Some(a1 + b1)`, `c + b` to + /// `Some(a1 + b1)` and `d` to `None`, meaning that it cannot be projected. + pub fn project_expr( + &self, + expr: &Arc, + projection_mapping: &ProjectionMapping, + ) -> Option> { + self.eq_group.project_expr(projection_mapping, expr) + } + + /// Projects the equivalences within according to `projection_mapping` + /// and `output_schema`. + pub fn project( + &self, + projection_mapping: &ProjectionMapping, + output_schema: SchemaRef, + ) -> Self { + let mut projected_orderings = self + .oeq_class + .iter() + .filter_map(|order| self.eq_group.project_ordering(projection_mapping, order)) + .collect::>(); + for (source, target) in projection_mapping.iter() { + let expr_ordering = ExprOrdering::new(source.clone()) + .transform_up(&|expr| update_ordering(expr, self)) + .unwrap(); + if let SortProperties::Ordered(options) = expr_ordering.state { + // Push new ordering to the state. + projected_orderings.push(vec![PhysicalSortExpr { + expr: target.clone(), + options, + }]); + } + } + Self { + eq_group: self.eq_group.project(projection_mapping), + oeq_class: OrderingEquivalenceClass::new(projected_orderings), + constants: vec![], + schema: output_schema, + } + } + + /// Returns the longest (potentially partial) permutation satisfying the + /// existing ordering. For example, if we have the equivalent orderings + /// `[a ASC, b ASC]` and `[c DESC]`, with `exprs` containing `[c, b, a, d]`, + /// then this function returns `([a ASC, b ASC, c DESC], [2, 1, 0])`. + /// This means that the specification `[a ASC, b ASC, c DESC]` is satisfied + /// by the existing ordering, and `[a, b, c]` resides at indices: `2, 1, 0` + /// inside the argument `exprs` (respectively). For the mathematical + /// definition of "partial permutation", see: + /// + /// + pub fn find_longest_permutation( + &self, + exprs: &[Arc], + ) -> (LexOrdering, Vec) { + let normalized_exprs = self.eq_group.normalize_exprs(exprs.to_vec()); + // Use a map to associate expression indices with sort options: + let mut ordered_exprs = IndexMap::::new(); + for ordering in self.normalized_oeq_class().iter() { + for sort_expr in ordering { + if let Some(idx) = normalized_exprs + .iter() + .position(|expr| sort_expr.expr.eq(expr)) + { + if let Entry::Vacant(e) = ordered_exprs.entry(idx) { + e.insert(sort_expr.options); + } + } else { + // We only consider expressions that correspond to a prefix + // of one of the equivalent orderings we have. + break; + } + } + } + // Construct the lexicographical ordering according to the permutation: + ordered_exprs + .into_iter() + .map(|(idx, options)| { + ( + PhysicalSortExpr { + expr: exprs[idx].clone(), + options, + }, + idx, + ) + }) + .unzip() + } +} + +/// Calculates the [`SortProperties`] of a given [`ExprOrdering`] node. +/// The node can either be a leaf node, or an intermediate node: +/// - If it is a leaf node, we directly find the order of the node by looking +/// at the given sort expression and equivalence properties if it is a `Column` +/// leaf, or we mark it as unordered. In the case of a `Literal` leaf, we mark +/// it as singleton so that it can cooperate with all ordered columns. +/// - If it is an intermediate node, the children states matter. Each `PhysicalExpr` +/// and operator has its own rules on how to propagate the children orderings. +/// However, before we engage in recursion, we check whether this intermediate +/// node directly matches with the sort expression. If there is a match, the +/// sort expression emerges at that node immediately, discarding the recursive +/// result coming from its children. +fn update_ordering( + mut node: ExprOrdering, + eq_properties: &EquivalenceProperties, +) -> Result> { + if !node.expr.children().is_empty() { + // We have an intermediate (non-leaf) node, account for its children: + node.state = node.expr.get_ordering(&node.children_states); + Ok(Transformed::Yes(node)) + } else if node.expr.as_any().is::() { + // We have a Column, which is one of the two possible leaf node types: + let eq_group = &eq_properties.eq_group; + let normalized_expr = eq_group.normalize_expr(node.expr.clone()); + let oeq_class = &eq_properties.oeq_class; + if let Some(options) = oeq_class.get_options(&normalized_expr) { + node.state = SortProperties::Ordered(options); + Ok(Transformed::Yes(node)) + } else { + Ok(Transformed::No(node)) + } + } else { + // We have a Literal, which is the other possible leaf node type: + node.state = node.expr.get_ordering(&[]); + Ok(Transformed::Yes(node)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::expressions::{col, BinaryExpr}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_expr::Operator; + + #[test] + fn test_update_ordering() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + Field::new("d", DataType::Int32, true), + ]); + + let mut eq_properties = EquivalenceProperties::new(Arc::new(schema.clone())); + let col_a = &col("a", &schema)?; + let col_b = &col("b", &schema)?; + let col_c = &col("c", &schema)?; + let col_d = &col("d", &schema)?; + let option_asc = SortOptions { + descending: false, + nulls_first: false, + }; + // b=a (e.g they are aliases) + eq_properties.add_equal_conditions(col_b, col_a); + // [b ASC], [d ASC] + eq_properties.add_new_orderings(vec![ + vec![PhysicalSortExpr { + expr: col_b.clone(), + options: option_asc, + }], + vec![PhysicalSortExpr { + expr: col_d.clone(), + options: option_asc, + }], + ]); + + let test_cases = vec![ + // d + b + ( + Arc::new(BinaryExpr::new( + col_d.clone(), + Operator::Plus, + col_b.clone(), + )) as Arc, + SortProperties::Ordered(option_asc), + ), + // b + (col_b.clone(), SortProperties::Ordered(option_asc)), + // a + (col_a.clone(), SortProperties::Ordered(option_asc)), + // a + c + ( + Arc::new(BinaryExpr::new( + col_a.clone(), + Operator::Plus, + col_c.clone(), + )), + SortProperties::Unordered, + ), + ]; + for (expr, expected) in test_cases { + let expr_ordering = ExprOrdering::new(expr.clone()); + let expr_ordering = expr_ordering + .transform_up(&|expr| update_ordering(expr, &eq_properties))?; + let err_msg = format!( + "expr:{:?}, expected: {:?}, actual: {:?}", + expr, expected, expr_ordering.state + ); + assert_eq!(expr_ordering.state, expected, "{}", err_msg); + } + + Ok(()) + } +}