From 49951e7cde6eb88e64909cd3b62e325e92a80cd4 Mon Sep 17 00:00:00 2001 From: Mikhail Cheshkov Date: Wed, 21 Aug 2024 13:48:26 +0300 Subject: [PATCH] [WIP] Backport ac2e5d15 "Support type coercion for equijoin (#4666)" --- datafusion/core/src/logical_plan/plan.rs | 5 ++- datafusion/core/src/optimizer/utils.rs | 40 +++++++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index 58a5d9fc227ba..0f4542a9fcecd 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -635,9 +635,12 @@ impl LogicalPlan { aggr_expr, .. }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(), + // There are two part of expression for join, equijoin(on) and non-equijoin(filter). + // 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`. + // 2. the second part is non-equijoin(filter). LogicalPlan::Join(Join { on, .. }) => on .iter() - .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())]) + .map(|(l, r)| Expr::eq(Expr::Column(l.clone()), Expr::Column(r.clone()))) .collect(), LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(), LogicalPlan::Extension(extension) => extension.node.expressions(), diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 0e908dfde9632..68e7a0be12aa2 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -235,12 +235,50 @@ pub fn from_plan( }) => { let schema = build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?; + + let equi_expr_count = on.len(); + assert!(expr.len() >= equi_expr_count); + + // The preceding part of expr is equi-exprs, + // and the struct of each equi-expr is like `left-expr = right-expr`. + let new_on = expr.iter().take(equi_expr_count).map(|equi_expr| { + let (left, right) = if let Expr::BinaryExpr{ left, op: Operator::Eq, right } = equi_expr { + (left, right) + } + else { + return Err(DataFusionError::Internal(format!( + "The front part expressions should be an binary expression, actual:{}", + equi_expr + ))); + }; + + let left = if let Expr::Column(left) = left.as_ref() { left } else { + return Err(DataFusionError::Internal(format!( + "The left expressions should be a column expression, actual:{}", + left + ))); + }; + + let right = if let Expr::Column(right) = right.as_ref() { right } else { + return Err(DataFusionError::Internal(format!( + "The right expressions should be a column expression, actual:{}", + right + ))); + }; + + Ok((left.clone(), right.clone())) + }).collect::>>()?; + + // // TODO do we need it? + // let filter_expr = + // (expr.len() > equi_expr_count).then(|| expr[expr.len() - 1].clone()); + Ok(LogicalPlan::Join(Join { left: Arc::new(inputs[0].clone()), right: Arc::new(inputs[1].clone()), join_type: *join_type, join_constraint: *join_constraint, - on: on.clone(), + on: new_on, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, }))