From 284b40cb051ff72f4fd32c44be18384cfef09d18 Mon Sep 17 00:00:00 2001 From: jackwener Date: Mon, 2 May 2022 00:41:42 +0800 Subject: [PATCH 1/6] plan: remove alias in union --- datafusion/core/src/logical_plan/builder.rs | 50 +++++++++++++++---- .../core/src/optimizer/filter_push_down.rs | 6 +-- .../core/src/optimizer/limit_push_down.rs | 10 +--- .../src/optimizer/projection_push_down.rs | 7 +-- datafusion/core/src/optimizer/utils.rs | 11 ++-- datafusion/expr/src/logical_plan/plan.rs | 2 - 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 80ebfadd1628..512b8f3a0475 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1088,16 +1088,46 @@ pub fn union_with_alias( } let union_schema = (**inputs[0].schema()).clone(); - let union_schema = Arc::new(match alias { - Some(ref alias) => union_schema.replace_qualifier(alias.as_str()), - None => union_schema.strip_qualifiers(), - }); - - Ok(LogicalPlan::Union(Union { - inputs, - schema: union_schema, - alias, - })) + match alias { + Some(ref alias) => { + let alias_schema = union_schema.clone().replace_qualifier(alias.as_str()); + let union_schema = union_schema.strip_qualifiers(); + + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; + + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(LogicalPlan::Union(Union { + inputs, + schema: Arc::new(union_schema), + })), + alias: alias.to_string(), + schema: Arc::new(alias_schema), + })) + } + None => { + let union_schema = union_schema.strip_qualifiers(); + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; + + Ok(LogicalPlan::Union(Union { + inputs, + schema: Arc::new(union_schema), + })) + } + } } /// Project with optional alias diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 795f86392724..9a65a3626550 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -358,11 +358,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // sort is filter-commutable push_down(&state, plan) } - LogicalPlan::Union(Union { - inputs: _, - schema, - alias: _, - }) => { + LogicalPlan::Union(Union { inputs: _, schema }) => { // union changing all qualifiers while building logical plan so we need // to rewrite filters to push unqualified columns to inputs let projection = schema diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 0c68f1761601..51ecd98f23e9 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -100,14 +100,7 @@ fn limit_push_down( alias: alias.clone(), })) } - ( - LogicalPlan::Union(Union { - inputs, - alias, - schema, - }), - Some(upper_limit), - ) => { + (LogicalPlan::Union(Union { inputs, schema }), Some(upper_limit)) => { // Push down limit through UNION let new_inputs = inputs .iter() @@ -125,7 +118,6 @@ fn limit_push_down( .collect::>()?; Ok(LogicalPlan::Union(Union { inputs: new_inputs, - alias: alias.clone(), schema: schema.clone(), })) } diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 9cbec1e908e0..9bf1b997904c 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -383,11 +383,7 @@ fn optimize_plan( schema: a.schema.clone(), })) } - LogicalPlan::Union(Union { - inputs, - schema, - alias, - }) => { + LogicalPlan::Union(Union { inputs, schema }) => { // UNION inputs will reference the same column with different identifiers, so we need // to populate new_required_columns by unqualified column name based on required fields // from the resulting UNION output @@ -429,7 +425,6 @@ fn optimize_plan( Ok(LogicalPlan::Union(Union { inputs: new_inputs, schema: Arc::new(new_schema), - alias: alias.clone(), })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 0e54fd93ee99..ce1acb77b3e5 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -268,13 +268,10 @@ pub fn from_plan( LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { node: e.node.from_template(expr, inputs), })), - LogicalPlan::Union(Union { schema, alias, .. }) => { - Ok(LogicalPlan::Union(Union { - inputs: inputs.to_vec(), - schema: schema.clone(), - alias: alias.clone(), - })) - } + LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union { + inputs: inputs.to_vec(), + schema: schema.clone(), + })), LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ab96dbe732d4..5ff1ae646b15 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1029,8 +1029,6 @@ pub struct Union { pub inputs: Vec, /// Union schema. Should be the same for all inputs. pub schema: DFSchemaRef, - /// Union output relation alias - pub alias: Option, } /// Creates an in memory table. From 6881184884e2f5d6730e71224b59ec0c58b33f08 Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 4 May 2022 20:42:33 +0800 Subject: [PATCH 2/6] fix the push_down --- .../core/src/optimizer/filter_push_down.rs | 11 +++++---- .../core/src/optimizer/limit_push_down.rs | 22 +++++++++++++++++ .../src/optimizer/projection_push_down.rs | 24 +++++++++++++++++++ 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 9a65a3626550..1277172aac06 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -901,11 +901,12 @@ mod tests { // filter appears below Union without relation qualifier let expected = "\ - Union\ - \n Filter: #a = Int64(1)\ - \n TableScan: test projection=None\ - \n Filter: #a = Int64(1)\ - \n TableScan: test projection=None"; + SubqueryAlias: t\ + \n Union\ + \n Filter: #t.a = Int64(1)\ + \n TableScan: test projection=None\ + \n Filter: #t.a = Int64(1)\ + \n TableScan: test projection=None"; assert_optimized_plan_eq(&plan, expected); Ok(()) } diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 51ecd98f23e9..c665eecc5e4e 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -17,6 +17,8 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) +use datafusion_expr::logical_plan::SubqueryAlias; + use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; @@ -121,6 +123,26 @@ fn limit_push_down( schema: schema.clone(), })) } + ( + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + }), + upper_limit, + ) => { + // Push down limit directly + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(limit_push_down( + _optimizer, + upper_limit, + input.as_ref(), + _execution_props, + )?), + schema: schema.clone(), + alias: alias.clone(), + })) + } // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down _ => { diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 9bf1b997904c..5c892b44ec42 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -450,6 +450,30 @@ fn optimize_plan( let expr = vec![]; utils::from_plan(plan, &expr, &new_inputs) } + LogicalPlan::Union(Union { schema, .. }) => { + // Scope of alias is outside, inside we should remove it. + // Here we remove the alias `in new_required_columns`. + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: None, + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _execution_props, + )?]; + let expr = vec![]; + + utils::from_plan(plan, &expr, &new_inputs) + } _ => Err(DataFusionError::Plan( "SubqueryAlias should only wrap TableScan".to_string(), )), From fa7d753dafc2acc423e9b06bf52a622494b40081 Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 4 May 2022 20:58:37 +0800 Subject: [PATCH 3/6] refactor code --- datafusion/core/src/logical_plan/builder.rs | 51 ++++++++------------- 1 file changed, 19 insertions(+), 32 deletions(-) diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 512b8f3a0475..c41a62686b5b 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1088,45 +1088,32 @@ pub fn union_with_alias( } let union_schema = (**inputs[0].schema()).clone(); - match alias { - Some(ref alias) => { - let alias_schema = union_schema.clone().replace_qualifier(alias.as_str()); - let union_schema = union_schema.strip_qualifiers(); + let union_schema_strip = union_schema.clone().strip_qualifiers(); - inputs - .iter() - .skip(1) - .try_for_each(|input_plan| -> Result<()> { - union_schema.check_arrow_schema_type_compatible( - &((**input_plan.schema()).clone().into()), - ) - })?; + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema_strip.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; + + let union_plan = LogicalPlan::Union(Union { + inputs, + schema: Arc::new(union_schema_strip), + }); + match alias { + Some(ref alias) => { + let alias_schema = union_schema.replace_qualifier(alias.as_str()); Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { - input: Arc::new(LogicalPlan::Union(Union { - inputs, - schema: Arc::new(union_schema), - })), + input: Arc::new(union_plan), alias: alias.to_string(), schema: Arc::new(alias_schema), })) } - None => { - let union_schema = union_schema.strip_qualifiers(); - inputs - .iter() - .skip(1) - .try_for_each(|input_plan| -> Result<()> { - union_schema.check_arrow_schema_type_compatible( - &((**input_plan.schema()).clone().into()), - ) - })?; - - Ok(LogicalPlan::Union(Union { - inputs, - schema: Arc::new(union_schema), - })) - } + None => Ok(union_plan), } } From 429c02f8dbef027873cdd04e3c3e730e77aa33fb Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 4 May 2022 21:20:37 +0800 Subject: [PATCH 4/6] fix filter_push_down --- datafusion/core/src/optimizer/filter_push_down.rs | 2 ++ datafusion/core/src/optimizer/projection_push_down.rs | 2 +- datafusion/core/src/physical_plan/planner.rs | 3 +++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 1277172aac06..fc1f090deaec 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; +use datafusion_expr::logical_plan::SubqueryAlias; use std::collections::{HashMap, HashSet}; /// Filter Push Down optimizer rule pushes filter clauses down the plan @@ -503,6 +504,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }), ) } + LogicalPlan::SubqueryAlias(SubqueryAlias { .. }) => push_down(&state, plan), _ => { // all other plans are _not_ filter-commutable let used_columns = plan diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 5c892b44ec42..39edd2a27aad 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -450,7 +450,7 @@ fn optimize_plan( let expr = vec![]; utils::from_plan(plan, &expr, &new_inputs) } - LogicalPlan::Union(Union { schema, .. }) => { + LogicalPlan::Union(Union { .. }) => { // Scope of alias is outside, inside we should remove it. // Here we remove the alias `in new_required_columns`. let new_required_columns = new_required_columns diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 47829ad798ca..b1b4bbf92751 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -835,6 +835,9 @@ impl DefaultPhysicalPlanner { LogicalPlan::TableScan(..) => { self.create_initial_plan(input, session_state).await } + LogicalPlan::Union(..) => { + self.create_initial_plan(input, session_state).await + } _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) } } From 0c829b21b6b92bf87aad33cac467597936bd5825 Mon Sep 17 00:00:00 2001 From: jackwener Date: Wed, 4 May 2022 21:21:44 +0800 Subject: [PATCH 5/6] typo --- datafusion/core/src/optimizer/projection_push_down.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 39edd2a27aad..b32db62c7296 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -452,7 +452,7 @@ fn optimize_plan( } LogicalPlan::Union(Union { .. }) => { // Scope of alias is outside, inside we should remove it. - // Here we remove the alias `in new_required_columns`. + // Here we remove the alias in `new_required_columns`. let new_required_columns = new_required_columns .iter() .map(|c| match &c.relation { From ab3d258f7996104c6b95c89e23aef49a5d76a24c Mon Sep 17 00:00:00 2001 From: jackwener Date: Fri, 13 May 2022 18:52:23 +0800 Subject: [PATCH 6/6] tmp --- .../core/src/optimizer/filter_push_down.rs | 63 ++++++++++++++++++- .../src/optimizer/projection_push_down.rs | 34 ++++++++++ datafusion/core/src/physical_plan/planner.rs | 3 + datafusion/core/tests/sql/mod.rs | 4 ++ 4 files changed, 101 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index fc1f090deaec..2b2d216bcc86 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -504,7 +504,64 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }), ) } - LogicalPlan::SubqueryAlias(SubqueryAlias { .. }) => push_down(&state, plan), + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, input, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // println!("state: {:?}", &state); + + // let t = match &**input { + // LogicalPlan::TableScan(TableScan { table_name, .. }) => { + // Some(table_name.clone()) + // } + // _ => None, + // }; + + let new_filters: Result)>> = state + .filters + .iter() + .map(|pair| { + let mut replace: HashMap<&Column, Column> = HashMap::new(); + let new_columns: HashSet = pair + .1 + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => { + let column = Column { + relation: None, + name: c.name.clone(), + }; + replace.insert(c, column.clone()); + column + } + _ => c.clone(), + }) + .collect(); + let replace_map: HashMap<&Column, &Column> = + replace.iter().map(|(k, v)| (*k, v)).collect(); + let new_expr = replace_col(pair.0.clone(), &replace_map)?; + Ok((new_expr, new_columns)) + }) + .collect(); + + let state = State { + filters: new_filters?, + }; + + println!("state: {:?}", &state); + + let plan = push_down(&state, plan)?; + // let copy = plan.clone(); + // println!("after SubqueryAlias: {:?}", copy); + + // match copy { + // LogicalPlan::SubqueryAlias(SubqueryAlias { input, schema, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // } + // _ => {} + // }; + Ok(plan) + } _ => { // all other plans are _not_ filter-commutable let used_columns = plan @@ -905,9 +962,9 @@ mod tests { let expected = "\ SubqueryAlias: t\ \n Union\ - \n Filter: #t.a = Int64(1)\ + \n Filter: #a = Int64(1)\ \n TableScan: test projection=None\ - \n Filter: #t.a = Int64(1)\ + \n Filter: #a = Int64(1)\ \n TableScan: test projection=None"; assert_optimized_plan_eq(&plan, expected); Ok(()) diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index b32db62c7296..193fbea4ad30 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -32,6 +32,7 @@ use crate::optimizer::utils; use crate::sql::utils::find_sort_exprs; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; +use datafusion_expr::logical_plan::Filter; use std::{ collections::{BTreeSet, HashSet}, sync::Arc, @@ -428,6 +429,7 @@ fn optimize_plan( })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + println!("input: {:?}", *input); match input.as_ref() { LogicalPlan::TableScan(TableScan { table_name, .. }) => { let new_required_columns = new_required_columns @@ -474,6 +476,38 @@ fn optimize_plan( utils::from_plan(plan, &expr, &new_inputs) } + LogicalPlan::Filter(Filter { input, .. }) => { + // let new_inputs = vec![optimize_plan( + // _optimizer, + // input, + // &new_required_columns, + // has_projection, + // _execution_props, + // )?]; + // let expr = vec![]; + + // utils::from_plan(plan, &expr, &new_inputs) + let expr = plan.expressions(); + // collect all required columns by this plan + utils::exprlist_to_columns(&expr, &mut new_required_columns)?; + + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|input_plan| { + optimize_plan( + _optimizer, + input_plan, + &new_required_columns, + has_projection, + _execution_props, + ) + }) + .collect::>>()?; + + utils::from_plan(plan, &expr, &new_inputs) + } _ => Err(DataFusionError::Plan( "SubqueryAlias should only wrap TableScan".to_string(), )), diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index b1b4bbf92751..a1b948b1db64 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -838,6 +838,9 @@ impl DefaultPhysicalPlanner { LogicalPlan::Union(..) => { self.create_initial_plan(input, session_state).await } + LogicalPlan::Filter(..) => { + self.create_initial_plan(input, session_state).await + } _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) } } diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 566baefab7f4..5704b5d83c1a 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -548,6 +548,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .unwrap(); let optimized_logical_schema = plan.schema(); + println!("plan {:?}", plan); + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); let plan = ctx .create_physical_plan(&plan) @@ -555,6 +557,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); + println!("plan {:?}", plan); + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); let task_ctx = ctx.task_ctx(); let results = collect(plan, task_ctx)