From 47d55c679f0cc8de0a56272030e08af1ab91382d Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sat, 30 Nov 2024 10:16:56 -0800 Subject: [PATCH 1/5] Initial commit --- datafusion/common/src/lib.rs | 2 + .../physical-expr-common/src/physical_expr.rs | 26 +++++++- .../physical-expr/src/equivalence/class.rs | 39 ++++++------ datafusion/physical-expr/src/physical_expr.rs | 61 +------------------ 4 files changed, 49 insertions(+), 79 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 77e8cd60ede2..38b864d20e63 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -93,6 +93,8 @@ pub use error::{ pub type HashMap = hashbrown::HashMap; pub type HashSet = hashbrown::HashSet; +pub type IndexSet = indexmap::IndexSet; + /// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is /// not possible. In normal usage of DataFusion the downcast should always succeed. /// diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 8ab7030dd8a1..cba106595bf4 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -26,7 +26,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, not_impl_err, Result}; +use datafusion_common::{internal_err, not_impl_err, IndexSet, Result}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::ExprProperties; @@ -235,3 +235,27 @@ pub fn format_physical_expr_list(exprs: &[Arc]) -> impl Displa } DisplayWrapper(exprs) } + +/// Returns [`Display`] able a list of [`PhysicalExpr`] +/// +/// Example output: `[a + 1, b]` +pub fn format_physical_expr_list2( + exprs: &IndexSet>, +) -> impl Display + '_ { + struct DisplayWrapper<'a>(&'a IndexSet>); + impl Display for DisplayWrapper<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut iter = self.0.iter(); + write!(f, "[")?; + if let Some(expr) = iter.next() { + write!(f, "{}", expr)?; + } + for expr in iter { + write!(f, ", {}", expr)?; + } + write!(f, "]")?; + Ok(()) + } + } + DisplayWrapper(exprs) +} diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9e00b756b42a..9fdddaed3e59 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,19 +15,18 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Display; -use std::sync::Arc; - use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; use crate::{ - expressions::Column, physical_expr::deduplicate_physical_exprs, - physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, LexRequirement, + expressions::Column, physical_exprs_contains, LexOrdering, LexRequirement, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement, }; +use indexmap::IndexSet; +use std::fmt::Display; +use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; -use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; +use datafusion_physical_expr_common::physical_expr::format_physical_expr_list2; /// A structure representing a expression known to be constant in a physical execution plan. /// @@ -191,46 +190,48 @@ pub struct EquivalenceClass { /// matter for equivalence purposes /// /// TODO: use a HashSet for this instead of a Vec - exprs: Vec>, + exprs: IndexSet>, } impl PartialEq for EquivalenceClass { /// Returns true if other is equal in the sense /// of bags (multi-sets), disregarding their orderings. fn eq(&self, other: &Self) -> bool { - physical_exprs_bag_equal(&self.exprs, &other.exprs) + self.exprs.eq(&other.exprs) } } impl EquivalenceClass { /// Create a new empty equivalence class pub fn new_empty() -> Self { - Self { exprs: vec![] } + Self { + exprs: IndexSet::new(), + } } // Create a new equivalence class from a pre-existing `Vec` - pub fn new(mut exprs: Vec>) -> Self { - deduplicate_physical_exprs(&mut exprs); - Self { exprs } + pub fn new(exprs: Vec>) -> Self { + // deduplicate_physical_exprs(&mut exprs); + Self { + exprs: exprs.into_iter().collect(), + } } /// Return the inner vector of expressions pub fn into_vec(self) -> Vec> { - self.exprs + self.exprs.into_iter().collect() } /// Return the "canonical" expression for this class (the first element) /// if any fn canonical_expr(&self) -> Option> { - self.exprs.first().cloned() + self.exprs.iter().next().cloned() } /// Insert the expression into this class, meaning it is known to be equal to /// all other expressions in this class pub fn push(&mut self, expr: Arc) { - if !self.contains(&expr) { - self.exprs.push(expr); - } + self.exprs.insert(expr); } /// Inserts all the expressions from other into this class @@ -243,7 +244,7 @@ impl EquivalenceClass { /// Returns true if this equivalence class contains t expression pub fn contains(&self, expr: &Arc) -> bool { - physical_exprs_contains(&self.exprs, expr) + self.exprs.contains(expr) } /// Returns true if this equivalence class has any entries in common with `other` @@ -281,7 +282,7 @@ impl EquivalenceClass { impl Display for EquivalenceClass { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "[{}]", format_physical_expr_list(&self.exprs)) + write!(f, "[{}]", format_physical_expr_list2(&self.exprs)) } } diff --git a/datafusion/physical-expr/src/physical_expr.rs b/datafusion/physical-expr/src/physical_expr.rs index 610c3656998b..9a9f40b6a1d4 100644 --- a/datafusion/physical-expr/src/physical_expr.rs +++ b/datafusion/physical-expr/src/physical_expr.rs @@ -65,34 +65,14 @@ pub fn physical_exprs_bag_equal( } } -/// This utility function removes duplicates from the given `exprs` vector. -/// Note that this function does not necessarily preserve its input ordering. -pub fn deduplicate_physical_exprs(exprs: &mut Vec>) { - // TODO: Once we can use `HashSet`s with `Arc`, this - // function should use a `HashSet` to reduce computational complexity. - // See issue: https://github.com/apache/datafusion/issues/8027 - let mut idx = 0; - while idx < exprs.len() { - let mut rest_idx = idx + 1; - while rest_idx < exprs.len() { - if exprs[idx].eq(&exprs[rest_idx]) { - exprs.swap_remove(rest_idx); - } else { - rest_idx += 1; - } - } - idx += 1; - } -} - #[cfg(test)] mod tests { use std::sync::Arc; use crate::expressions::{Column, Literal}; use crate::physical_expr::{ - deduplicate_physical_exprs, physical_exprs_bag_equal, physical_exprs_contains, - physical_exprs_equal, PhysicalExpr, + physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal, + PhysicalExpr, }; use datafusion_common::ScalarValue; @@ -208,41 +188,4 @@ mod tests { assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice())); assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice())); } - - #[test] - fn test_deduplicate_physical_exprs() { - let lit_true = &(Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))) - as Arc); - let lit_false = &(Arc::new(Literal::new(ScalarValue::Boolean(Some(false)))) - as Arc); - let lit4 = &(Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) - as Arc); - let lit2 = &(Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) - as Arc); - let col_a_expr = &(Arc::new(Column::new("a", 0)) as Arc); - let col_b_expr = &(Arc::new(Column::new("b", 1)) as Arc); - - // First vector in the tuple is arguments, second one is the expected value. - let test_cases = vec![ - // ---------- TEST CASE 1----------// - ( - vec![ - lit_true, lit_false, lit4, lit2, col_a_expr, col_a_expr, col_b_expr, - lit_true, lit2, - ], - vec![lit_true, lit_false, lit4, lit2, col_a_expr, col_b_expr], - ), - // ---------- TEST CASE 2----------// - ( - vec![lit_true, lit_true, lit_false, lit4], - vec![lit_true, lit4, lit_false], - ), - ]; - for (exprs, expected) in test_cases { - let mut exprs = exprs.into_iter().cloned().collect::>(); - let expected = expected.into_iter().cloned().collect::>(); - deduplicate_physical_exprs(&mut exprs); - assert!(physical_exprs_equal(&exprs, &expected)); - } - } } From 7857872ae3dcc7bc65787a01e2584f48d74f8502 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sat, 30 Nov 2024 21:44:14 -0800 Subject: [PATCH 2/5] Change implementation to take iterator --- .../physical-expr-common/src/physical_expr.rs | 48 ++++++++----------- .../physical-expr/src/equivalence/class.rs | 4 +- 2 files changed, 21 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index cba106595bf4..93bdcdef8ea0 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -26,7 +26,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; -use datafusion_common::{internal_err, not_impl_err, IndexSet, Result}; +use datafusion_common::{internal_err, not_impl_err, Result}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; use datafusion_expr_common::sort_properties::ExprProperties; @@ -217,11 +217,24 @@ pub fn with_new_children_if_necessary( /// Returns [`Display`] able a list of [`PhysicalExpr`] /// /// Example output: `[a + 1, b]` -pub fn format_physical_expr_list(exprs: &[Arc]) -> impl Display + '_ { - struct DisplayWrapper<'a>(&'a [Arc]); - impl Display for DisplayWrapper<'_> { +pub fn format_physical_expr_list(exprs: T) -> impl Display +where + T: IntoIterator, + T::Item: Display, + T::IntoIter: Clone, +{ + struct DisplayWrapper(I) + where + I: Iterator + Clone, + I::Item: Display; + + impl Display for DisplayWrapper + where + I: Iterator + Clone, + I::Item: Display, + { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut iter = self.0.iter(); + let mut iter = self.0.clone(); write!(f, "[")?; if let Some(expr) = iter.next() { write!(f, "{}", expr)?; @@ -233,29 +246,6 @@ pub fn format_physical_expr_list(exprs: &[Arc]) -> impl Displa Ok(()) } } - DisplayWrapper(exprs) -} -/// Returns [`Display`] able a list of [`PhysicalExpr`] -/// -/// Example output: `[a + 1, b]` -pub fn format_physical_expr_list2( - exprs: &IndexSet>, -) -> impl Display + '_ { - struct DisplayWrapper<'a>(&'a IndexSet>); - impl Display for DisplayWrapper<'_> { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut iter = self.0.iter(); - write!(f, "[")?; - if let Some(expr) = iter.next() { - write!(f, "{}", expr)?; - } - for expr in iter { - write!(f, ", {}", expr)?; - } - write!(f, "]")?; - Ok(()) - } - } - DisplayWrapper(exprs) + DisplayWrapper(exprs.into_iter()) } diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 9fdddaed3e59..813585a8a74e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; -use datafusion_physical_expr_common::physical_expr::format_physical_expr_list2; +use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; /// A structure representing a expression known to be constant in a physical execution plan. /// @@ -282,7 +282,7 @@ impl EquivalenceClass { impl Display for EquivalenceClass { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "[{}]", format_physical_expr_list2(&self.exprs)) + write!(f, "[{}]", format_physical_expr_list(&self.exprs)) } } From 3cdc42a9081a39f8efa682e5d90062cfa8c82190 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sat, 30 Nov 2024 21:51:39 -0800 Subject: [PATCH 3/5] Minor changes --- datafusion/common/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 38b864d20e63..77e8cd60ede2 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -93,8 +93,6 @@ pub use error::{ pub type HashMap = hashbrown::HashMap; pub type HashSet = hashbrown::HashSet; -pub type IndexSet = indexmap::IndexSet; - /// Downcast an Arrow Array to a concrete type, return an `DataFusionError::Internal` if the cast is /// not possible. In normal usage of DataFusion the downcast should always succeed. /// From fa65753552710cf897adc8310deb31fa8b162a04 Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sun, 1 Dec 2024 12:33:34 -0800 Subject: [PATCH 4/5] Update datafusion/physical-expr/src/equivalence/class.rs Co-authored-by: Alex Huang --- datafusion/physical-expr/src/equivalence/class.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 813585a8a74e..797e5b36c37b 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -211,7 +211,6 @@ impl EquivalenceClass { // Create a new equivalence class from a pre-existing `Vec` pub fn new(exprs: Vec>) -> Self { - // deduplicate_physical_exprs(&mut exprs); Self { exprs: exprs.into_iter().collect(), } From 643dcef69a920db7c0a72778b9bc39bfe966e9bb Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Sun, 1 Dec 2024 12:34:47 -0800 Subject: [PATCH 5/5] Remove leftover comment --- datafusion/physical-expr/src/equivalence/class.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 797e5b36c37b..16051dae26ca 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -189,7 +189,6 @@ pub struct EquivalenceClass { /// The expressions in this equivalence class. The order doesn't /// matter for equivalence purposes /// - /// TODO: use a HashSet for this instead of a Vec exprs: IndexSet>, }