Skip to content

Commit

Permalink
refactor eliminate duplicated expr to avoid clone
Browse files Browse the repository at this point in the history
  • Loading branch information
Lordworms committed Apr 24, 2024
1 parent deebda7 commit 3c31bec
Showing 1 changed file with 90 additions and 56 deletions.
146 changes: 90 additions & 56 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::Result;
use datafusion_expr::expr::Sort as ExprSort;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{internal_err, Result};
use datafusion_expr::logical_plan::LogicalPlan;
use datafusion_expr::{Aggregate, Expr, Sort};
use hashbrown::HashSet;

use indexmap::IndexSet;
use std::hash::{Hash, Hasher};
/// Optimization rule that eliminate duplicated expr.
#[derive(Default)]
pub struct EliminateDuplicatedExpr;
Expand All @@ -35,78 +36,111 @@ impl EliminateDuplicatedExpr {
Self {}
}
}

// use this structure to avoid initial clone
#[derive(Eq, Clone, Debug)]
struct SortExprWrapper {
expr: Expr,
}
impl PartialEq for SortExprWrapper {
fn eq(&self, other: &Self) -> bool {
match (&self.expr, &other.expr) {
(Expr::Sort(own_sort), Expr::Sort(other_sort)) => {
own_sort.expr == other_sort.expr
}
_ => self.expr == other.expr,
}
}
}
impl Hash for SortExprWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
match &self.expr {
Expr::Sort(sort) => {
sort.expr.hash(state);
}
_ => {
self.expr.hash(state);
}
}
}
}
impl OptimizerRule for EliminateDuplicatedExpr {
fn try_optimize(
&self,
plan: &LogicalPlan,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called EliminateDuplicatedExpr::rewrite")
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
let normalized_sort_keys = sort
let len = sort.expr.len();
let normalized_sort_keys: Vec<_> = sort
.expr
.iter()
.map(|e| match e {
Expr::Sort(ExprSort { expr, .. }) => {
Expr::Sort(ExprSort::new(expr.clone(), true, false))
}
_ => e.clone(),
})
.collect::<Vec<_>>();
.into_iter()
.map(|e| SortExprWrapper { expr: e })
.collect();

// dedup sort.expr and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
sort.expr.iter().zip(normalized_sort_keys.iter()).for_each(
|(expr, normalized_expr)| {
if !dedup_set.contains(normalized_expr) {
dedup_expr.push(expr);
dedup_set.insert(normalized_expr);
}
},
);
if dedup_expr.len() == sort.expr.len() {
Ok(None)
} else {
Ok(Some(LogicalPlan::Sort(Sort {
expr: dedup_expr.into_iter().cloned().collect::<Vec<_>>(),
input: sort.input.clone(),
fetch: sort.fetch,
})))
let mut index_set = IndexSet::new(); // use index_set instead of Hashset to preserve order
for wrapper in normalized_sort_keys {
index_set.insert(wrapper);
}
let unique_exprs: Vec<_> =
index_set.into_iter().map(|wrapper| wrapper.expr).collect();
let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};

Ok(transformed(LogicalPlan::Sort(Sort {
expr: unique_exprs,
input: sort.input,
fetch: sort.fetch,
})))
}
LogicalPlan::Aggregate(agg) => {
// dedup agg.groupby and keep order
let mut dedup_expr = Vec::new();
let mut dedup_set = HashSet::new();
agg.group_expr.iter().for_each(|expr| {
if !dedup_set.contains(expr) {
dedup_expr.push(expr.clone());
dedup_set.insert(expr);
}
});
if dedup_expr.len() == agg.group_expr.len() {
Ok(None)
let len = agg.group_expr.len();
let unique_exprs: Vec<Expr> = agg
.group_expr
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect();

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Ok(Some(LogicalPlan::Aggregate(Aggregate::try_new(
agg.input.clone(),
dedup_expr,
agg.aggr_expr.clone(),
)?)))
}
Transformed::no
};

Aggregate::try_new_with_schema(
agg.input,
unique_exprs,
agg.aggr_expr,
agg.schema,
)
.map(|f| transformed(LogicalPlan::Aggregate(f)))
}
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
"eliminate_duplicated_expr"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 3c31bec

Please sign in to comment.