From 09cd01d2139918eae929e1883b2dfd111435396a Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Wed, 24 Apr 2024 13:57:45 -0500 Subject: [PATCH 1/5] refactor eliminate duplicated expr to avoid clone adding dep --- datafusion/optimizer/Cargo.toml | 2 +- .../src/eliminate_duplicated_expr.rs | 146 +++++++++++------- 2 files changed, 91 insertions(+), 57 deletions(-) diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index b1a6953501a6..917b3f19abeb 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -50,7 +50,7 @@ hashbrown = { version = "0.14", features = ["raw"] } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" - +indexmap = "*" [dev-dependencies] ctor = { workspace = true } datafusion-sql = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index ee44a328f8b3..980f08e75d2b 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -19,12 +19,13 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::Result; -use datafusion_expr::expr::Sort as ExprSort; +use datafusion_common::tree_node::Transformed; +use datafusion_common::{internal_err, Result}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Aggregate, Expr, Sort}; use hashbrown::HashSet; - +use indexmap::IndexSet; +use std::hash::{Hash, Hasher}; /// Optimization rule that eliminate duplicated expr. #[derive(Default)] pub struct EliminateDuplicatedExpr; @@ -35,78 +36,111 @@ impl EliminateDuplicatedExpr { Self {} } } - +// use this structure to avoid initial clone +#[derive(Eq, Clone, Debug)] +struct SortExprWrapper { + expr: Expr, +} +impl PartialEq for SortExprWrapper { + fn eq(&self, other: &Self) -> bool { + match (&self.expr, &other.expr) { + (Expr::Sort(own_sort), Expr::Sort(other_sort)) => { + own_sort.expr == other_sort.expr + } + _ => self.expr == other.expr, + } + } +} +impl Hash for SortExprWrapper { + fn hash(&self, state: &mut H) { + match &self.expr { + Expr::Sort(sort) => { + sort.expr.hash(state); + } + _ => { + self.expr.hash(state); + } + } + } +} impl OptimizerRule for EliminateDuplicatedExpr { fn try_optimize( &self, - plan: &LogicalPlan, + _plan: &LogicalPlan, _config: &dyn OptimizerConfig, ) -> Result> { + internal_err!("Should have called EliminateDuplicatedExpr::rewrite") + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { match plan { LogicalPlan::Sort(sort) => { - let normalized_sort_keys = sort + let len = sort.expr.len(); + let normalized_sort_keys: Vec<_> = sort .expr - .iter() - .map(|e| match e { - Expr::Sort(ExprSort { expr, .. }) => { - Expr::Sort(ExprSort::new(expr.clone(), true, false)) - } - _ => e.clone(), - }) - .collect::>(); + .into_iter() + .map(|e| SortExprWrapper { expr: e }) + .collect(); - // dedup sort.expr and keep order - let mut dedup_expr = Vec::new(); - let mut dedup_set = HashSet::new(); - sort.expr.iter().zip(normalized_sort_keys.iter()).for_each( - |(expr, normalized_expr)| { - if !dedup_set.contains(normalized_expr) { - dedup_expr.push(expr); - dedup_set.insert(normalized_expr); - } - }, - ); - if dedup_expr.len() == sort.expr.len() { - Ok(None) - } else { - Ok(Some(LogicalPlan::Sort(Sort { - expr: dedup_expr.into_iter().cloned().collect::>(), - input: sort.input.clone(), - fetch: sort.fetch, - }))) + let mut index_set = IndexSet::new(); // use index_set instead of Hashset to preserve order + for wrapper in normalized_sort_keys { + index_set.insert(wrapper); } + let unique_exprs: Vec<_> = + index_set.into_iter().map(|wrapper| wrapper.expr).collect(); + let transformed = if len != unique_exprs.len() { + Transformed::yes + } else { + Transformed::no + }; + + Ok(transformed(LogicalPlan::Sort(Sort { + expr: unique_exprs, + input: sort.input, + fetch: sort.fetch, + }))) } LogicalPlan::Aggregate(agg) => { - // dedup agg.groupby and keep order - let mut dedup_expr = Vec::new(); - let mut dedup_set = HashSet::new(); - agg.group_expr.iter().for_each(|expr| { - if !dedup_set.contains(expr) { - dedup_expr.push(expr.clone()); - dedup_set.insert(expr); - } - }); - if dedup_expr.len() == agg.group_expr.len() { - Ok(None) + let len = agg.group_expr.len(); + let unique_exprs: Vec = agg + .group_expr + .into_iter() + .collect::>() + .into_iter() + .collect(); + + let transformed = if len != unique_exprs.len() { + Transformed::yes } else { - Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new( - agg.input.clone(), - dedup_expr, - agg.aggr_expr.clone(), - )?))) - } + Transformed::no + }; + + Aggregate::try_new_with_schema( + agg.input, + unique_exprs, + agg.aggr_expr, + agg.schema, + ) + .map(|f| transformed(LogicalPlan::Aggregate(f))) } - _ => Ok(None), + _ => Ok(Transformed::no(plan)), } } - fn name(&self) -> &str { "eliminate_duplicated_expr" } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::TopDown) - } } #[cfg(test)] From e9f2b8fd261c5f01db9f42690ad3d9048edfe4db Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Wed, 24 Apr 2024 14:53:29 -0500 Subject: [PATCH 2/5] fix bugs --- datafusion/optimizer/Cargo.toml | 2 +- .../optimizer/src/eliminate_duplicated_expr.rs | 13 ++++--------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 917b3f19abeb..9b190c2bfcff 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -50,7 +50,7 @@ hashbrown = { version = "0.14", features = ["raw"] } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" -indexmap = "*" +indexmap = { workspace = true } [dev-dependencies] ctor = { workspace = true } datafusion-sql = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index 980f08e75d2b..a7357d393237 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -23,7 +23,6 @@ use datafusion_common::tree_node::Transformed; use datafusion_common::{internal_err, Result}; use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Aggregate, Expr, Sort}; -use hashbrown::HashSet; use indexmap::IndexSet; use std::hash::{Hash, Hasher}; /// Optimization rule that eliminate duplicated expr. @@ -114,10 +113,11 @@ impl OptimizerRule for EliminateDuplicatedExpr { } LogicalPlan::Aggregate(agg) => { let len = agg.group_expr.len(); + let unique_exprs: Vec = agg .group_expr .into_iter() - .collect::>() + .collect::>() .into_iter() .collect(); @@ -127,13 +127,8 @@ impl OptimizerRule for EliminateDuplicatedExpr { Transformed::no }; - Aggregate::try_new_with_schema( - agg.input, - unique_exprs, - agg.aggr_expr, - agg.schema, - ) - .map(|f| transformed(LogicalPlan::Aggregate(f))) + Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr) + .map(|f| transformed(LogicalPlan::Aggregate(f))) } _ => Ok(Transformed::no(plan)), } From c352644cea479ed8257fd88aecfc387bc45c081d Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Wed, 24 Apr 2024 15:07:29 -0500 Subject: [PATCH 3/5] change lock --- datafusion-cli/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index ba3e68e4011f..5263b064ff9b 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1323,6 +1323,7 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.3", + "indexmap 2.2.6", "itertools", "log", "regex-syntax", From 5826713f5e32283d3a05600dad03ecd6bb73e101 Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Wed, 24 Apr 2024 15:27:30 -0500 Subject: [PATCH 4/5] format toml --- datafusion/optimizer/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 9b190c2bfcff..45ece35c2388 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -47,10 +47,10 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { version = "0.14", features = ["raw"] } +indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } regex-syntax = "0.8.0" -indexmap = { workspace = true } [dev-dependencies] ctor = { workspace = true } datafusion-sql = { workspace = true } From ae8a12a899c7cf3bcf2da733db5e1ca6095cf53f Mon Sep 17 00:00:00 2001 From: Yanxin Xiang Date: Fri, 26 Apr 2024 05:47:00 -0500 Subject: [PATCH 5/5] refactor --- datafusion/optimizer/src/eliminate_duplicated_expr.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_duplicated_expr.rs b/datafusion/optimizer/src/eliminate_duplicated_expr.rs index a7357d393237..3dbfc750e899 100644 --- a/datafusion/optimizer/src/eliminate_duplicated_expr.rs +++ b/datafusion/optimizer/src/eliminate_duplicated_expr.rs @@ -87,18 +87,15 @@ impl OptimizerRule for EliminateDuplicatedExpr { match plan { LogicalPlan::Sort(sort) => { let len = sort.expr.len(); - let normalized_sort_keys: Vec<_> = sort + let unique_exprs: Vec<_> = sort .expr .into_iter() .map(|e| SortExprWrapper { expr: e }) + .collect::>() + .into_iter() + .map(|wrapper| wrapper.expr) .collect(); - let mut index_set = IndexSet::new(); // use index_set instead of Hashset to preserve order - for wrapper in normalized_sort_keys { - index_set.insert(wrapper); - } - let unique_exprs: Vec<_> = - index_set.into_iter().map(|wrapper| wrapper.expr).collect(); let transformed = if len != unique_exprs.len() { Transformed::yes } else {