Skip to content

Commit

Permalink
[WIP] Backport ac2e5d1 "Support type coercion for equijoin (apache#4666
Browse files Browse the repository at this point in the history
…)"
  • Loading branch information
mcheshkov committed Aug 21, 2024
1 parent bba28d6 commit 49951e7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
5 changes: 4 additions & 1 deletion datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,12 @@ impl LogicalPlan {
aggr_expr,
..
}) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
// There are two part of expression for join, equijoin(on) and non-equijoin(filter).
// 1. the first part is `on.len()` equijoin expressions, and the struct of each expr is `left-on = right-on`.
// 2. the second part is non-equijoin(filter).
LogicalPlan::Join(Join { on, .. }) => on
.iter()
.flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
.map(|(l, r)| Expr::eq(Expr::Column(l.clone()), Expr::Column(r.clone())))
.collect(),
LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
LogicalPlan::Extension(extension) => extension.node.expressions(),
Expand Down
40 changes: 39 additions & 1 deletion datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,50 @@ pub fn from_plan(
}) => {
let schema =
build_join_schema(inputs[0].schema(), inputs[1].schema(), join_type)?;

let equi_expr_count = on.len();
assert!(expr.len() >= equi_expr_count);

// The preceding part of expr is equi-exprs,
// and the struct of each equi-expr is like `left-expr = right-expr`.
let new_on = expr.iter().take(equi_expr_count).map(|equi_expr| {
let (left, right) = if let Expr::BinaryExpr{ left, op: Operator::Eq, right } = equi_expr {
(left, right)
}
else {
return Err(DataFusionError::Internal(format!(
"The front part expressions should be an binary expression, actual:{}",
equi_expr
)));
};

let left = if let Expr::Column(left) = left.as_ref() { left } else {
return Err(DataFusionError::Internal(format!(
"The left expressions should be a column expression, actual:{}",
left
)));
};

let right = if let Expr::Column(right) = right.as_ref() { right } else {
return Err(DataFusionError::Internal(format!(
"The right expressions should be a column expression, actual:{}",
right
)));
};

Ok((left.clone(), right.clone()))
}).collect::<Result<Vec<(Column,Column)>>>()?;

// // TODO do we need it?
// let filter_expr =
// (expr.len() > equi_expr_count).then(|| expr[expr.len() - 1].clone());

Ok(LogicalPlan::Join(Join {
left: Arc::new(inputs[0].clone()),
right: Arc::new(inputs[1].clone()),
join_type: *join_type,
join_constraint: *join_constraint,
on: on.clone(),
on: new_on,
schema: DFSchemaRef::new(schema),
null_equals_null: *null_equals_null,
}))
Expand Down

0 comments on commit 49951e7

Please sign in to comment.