-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stop copying LogicalPlan and Exprs in TypeCoercion
(10% faster planning)
#10356
Changes from all commits
826d51f
5ed976b
602b90f
0e87fb3
41ecf4b
b43a345
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ use std::sync::Arc; | |
use arrow::datatypes::{DataType, IntervalUnit}; | ||
|
||
use datafusion_common::config::ConfigOptions; | ||
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; | ||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; | ||
use datafusion_common::{ | ||
exec_err, internal_err, not_impl_err, plan_datafusion_err, plan_err, DFSchema, | ||
DataFusionError, Result, ScalarValue, | ||
|
@@ -31,8 +31,8 @@ use datafusion_expr::expr::{ | |
self, AggregateFunctionDefinition, Between, BinaryExpr, Case, Exists, InList, | ||
InSubquery, Like, ScalarFunction, WindowFunction, | ||
}; | ||
use datafusion_expr::expr_rewriter::rewrite_preserving_name; | ||
use datafusion_expr::expr_schema::cast_subquery; | ||
use datafusion_expr::logical_plan::tree_node::unwrap_arc; | ||
use datafusion_expr::logical_plan::Subquery; | ||
use datafusion_expr::type_coercion::binary::{ | ||
comparison_coercion, get_input_types, like_coercion, | ||
|
@@ -52,6 +52,7 @@ use datafusion_expr::{ | |
}; | ||
|
||
use crate::analyzer::AnalyzerRule; | ||
use crate::utils::NamePreserver; | ||
|
||
#[derive(Default)] | ||
pub struct TypeCoercion {} | ||
|
@@ -68,26 +69,28 @@ impl AnalyzerRule for TypeCoercion { | |
} | ||
|
||
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> { | ||
analyze_internal(&DFSchema::empty(), &plan) | ||
let empty_schema = DFSchema::empty(); | ||
|
||
let transformed_plan = plan | ||
.transform_up_with_subqueries(|plan| analyze_internal(&empty_schema, plan))? | ||
.data; | ||
|
||
Ok(transformed_plan) | ||
} | ||
} | ||
|
||
/// use the external schema to handle the correlated subqueries case | ||
/// | ||
/// Assumes that children have already been optimized | ||
fn analyze_internal( | ||
// use the external schema to handle the correlated subqueries case | ||
external_schema: &DFSchema, | ||
plan: &LogicalPlan, | ||
) -> Result<LogicalPlan> { | ||
// optimize child plans first | ||
let new_inputs = plan | ||
.inputs() | ||
.iter() | ||
.map(|p| analyze_internal(external_schema, p)) | ||
.collect::<Result<Vec<_>>>()?; | ||
plan: LogicalPlan, | ||
) -> Result<Transformed<LogicalPlan>> { | ||
// get schema representing all available input fields. This is used for data type | ||
// resolution only, so order does not matter here | ||
let mut schema = merge_schema(new_inputs.iter().collect()); | ||
let mut schema = merge_schema(plan.inputs()); | ||
|
||
if let LogicalPlan::TableScan(ts) = plan { | ||
if let LogicalPlan::TableScan(ts) = &plan { | ||
let source_schema = DFSchema::try_from_qualified_schema( | ||
ts.table_name.clone(), | ||
&ts.source.schema(), | ||
|
@@ -100,25 +103,75 @@ fn analyze_internal( | |
// select t2.c2 from t1 where t1.c1 in (select t2.c1 from t2 where t2.c2=t1.c3) | ||
schema.merge(external_schema); | ||
|
||
let mut expr_rewrite = TypeCoercionRewriter { schema: &schema }; | ||
|
||
let new_expr = plan | ||
.expressions() | ||
.into_iter() | ||
.map(|expr| { | ||
// ensure aggregate names don't change: | ||
// https://github.com/apache/datafusion/issues/3555 | ||
rewrite_preserving_name(expr, &mut expr_rewrite) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
plan.with_new_exprs(new_expr, new_inputs) | ||
let mut expr_rewrite = TypeCoercionRewriter::new(&schema); | ||
|
||
let name_preserver = NamePreserver::new(&plan); | ||
// apply coercion rewrite all expressions in the plan individually | ||
plan.map_expressions(|expr| { | ||
let original_name = name_preserver.save(&expr)?; | ||
expr.rewrite(&mut expr_rewrite)? | ||
.map_data(|expr| original_name.restore(expr)) | ||
})? | ||
// coerce join expressions specially | ||
.map_data(|plan| expr_rewrite.coerce_joins(plan))? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for anyone following along, the response is https://github.com/apache/datafusion/pull/10356/files#r1588998665 (tldr should do as a follow on PR) |
||
// recompute the schema after the expressions have been rewritten as the types may have changed | ||
.map_data(|plan| plan.recompute_schema()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we always need to run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an excellent point. At the moment, I think we do need to always run recompute_schema because the I filed #10365 to track improving this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I think you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are correct (of course!) thank you for pointing it out. Now that let new_plan =
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data;
Ok(Transformed::yes(Expr::Exists(Exists {
subquery: Subquery {
subquery: Arc::new(new_plan),
outer_ref_columns: subquery.outer_ref_columns,
},
negated,
})))
} Which discards the transformed information (and in this case always returns Transformed::true). In order to keep the PRs small and easier to review I would like to not change this PR (it is no worse than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok, it seems there are many unnecessary Sure, a follow-up PR sounds good, I agree that this PR already looks really nice! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is my draft followup: #10369 It is quite large (it requires updating the entire expression rewriter) so I am glad we left it in a separate PR |
||
} | ||
|
||
pub(crate) struct TypeCoercionRewriter<'a> { | ||
pub(crate) schema: &'a DFSchema, | ||
} | ||
|
||
impl<'a> TypeCoercionRewriter<'a> { | ||
fn new(schema: &'a DFSchema) -> Self { | ||
Self { schema } | ||
} | ||
|
||
/// Coerce join equality expressions | ||
/// | ||
/// Joins must be treated specially as their equality expressions are stored | ||
/// as a parallel list of left and right expressions, rather than a single | ||
/// equality expression | ||
/// | ||
/// For example, on_exprs like `t1.a = t2.b AND t1.x = t2.y` will be stored | ||
/// as a list of `(t1.a, t2.b), (t1.x, t2.y)` | ||
fn coerce_joins(&mut self, plan: LogicalPlan) -> Result<LogicalPlan> { | ||
let LogicalPlan::Join(mut join) = plan else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats an interesting syntax There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it checks the plan can be deconstructed into LogicalPlan::Join(...) and if its not the else branch is triggered? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is exactly right. It is one of my favorite Rust syntax's as it often can avoid a level of indenting https://doc.rust-lang.org/rust-by-example/flow_control/let_else.html |
||
return Ok(plan); | ||
}; | ||
|
||
join.on = join | ||
.on | ||
.into_iter() | ||
.map(|(lhs, rhs)| { | ||
// coerce the arguments as though they were a single binary equality | ||
// expression | ||
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this method needed, as it looks like we just cast lhs, rhs? it feels it can be simplified? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&op,
&right.get_type(self.schema)?,
)?; And |
||
Ok((lhs, rhs)) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
Ok(LogicalPlan::Join(join)) | ||
} | ||
|
||
fn coerce_binary_op( | ||
&self, | ||
left: Expr, | ||
op: Operator, | ||
right: Expr, | ||
) -> Result<(Expr, Expr)> { | ||
let (left_type, right_type) = get_input_types( | ||
&left.get_type(self.schema)?, | ||
&op, | ||
&right.get_type(self.schema)?, | ||
)?; | ||
Ok(( | ||
left.cast_to(&left_type, self.schema)?, | ||
right.cast_to(&right_type, self.schema)?, | ||
)) | ||
} | ||
} | ||
|
||
impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { | ||
type Node = Expr; | ||
|
||
|
@@ -131,14 +184,15 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { | |
subquery, | ||
outer_ref_columns, | ||
}) => { | ||
let new_plan = analyze_internal(self.schema, &subquery)?; | ||
let new_plan = analyze_internal(self.schema, unwrap_arc(subquery))?.data; | ||
Ok(Transformed::yes(Expr::ScalarSubquery(Subquery { | ||
subquery: Arc::new(new_plan), | ||
outer_ref_columns, | ||
}))) | ||
} | ||
Expr::Exists(Exists { subquery, negated }) => { | ||
let new_plan = analyze_internal(self.schema, &subquery.subquery)?; | ||
let new_plan = | ||
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; | ||
Ok(Transformed::yes(Expr::Exists(Exists { | ||
subquery: Subquery { | ||
subquery: Arc::new(new_plan), | ||
|
@@ -152,7 +206,8 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { | |
subquery, | ||
negated, | ||
}) => { | ||
let new_plan = analyze_internal(self.schema, &subquery.subquery)?; | ||
let new_plan = | ||
analyze_internal(self.schema, unwrap_arc(subquery.subquery))?.data; | ||
let expr_type = expr.get_type(self.schema)?; | ||
let subquery_type = new_plan.schema().field(0).data_type(); | ||
let common_type = comparison_coercion(&expr_type, subquery_type).ok_or(plan_datafusion_err!( | ||
|
@@ -221,15 +276,11 @@ impl<'a> TreeNodeRewriter for TypeCoercionRewriter<'a> { | |
)))) | ||
} | ||
Expr::BinaryExpr(BinaryExpr { left, op, right }) => { | ||
let (left_type, right_type) = get_input_types( | ||
&left.get_type(self.schema)?, | ||
&op, | ||
&right.get_type(self.schema)?, | ||
)?; | ||
let (left, right) = self.coerce_binary_op(*left, op, *right)?; | ||
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new( | ||
Box::new(left.cast_to(&left_type, self.schema)?), | ||
Box::new(left), | ||
op, | ||
Box::new(right.cast_to(&right_type, self.schema)?), | ||
Box::new(right), | ||
)))) | ||
} | ||
Expr::Between(Between { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 q