diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs index 80ebfadd1628..c41a62686b5b 100644 --- a/datafusion/core/src/logical_plan/builder.rs +++ b/datafusion/core/src/logical_plan/builder.rs @@ -1088,16 +1088,33 @@ pub fn union_with_alias( } let union_schema = (**inputs[0].schema()).clone(); - let union_schema = Arc::new(match alias { - Some(ref alias) => union_schema.replace_qualifier(alias.as_str()), - None => union_schema.strip_qualifiers(), - }); + let union_schema_strip = union_schema.clone().strip_qualifiers(); + + inputs + .iter() + .skip(1) + .try_for_each(|input_plan| -> Result<()> { + union_schema_strip.check_arrow_schema_type_compatible( + &((**input_plan.schema()).clone().into()), + ) + })?; - Ok(LogicalPlan::Union(Union { + let union_plan = LogicalPlan::Union(Union { inputs, - schema: union_schema, - alias, - })) + schema: Arc::new(union_schema_strip), + }); + + match alias { + Some(ref alias) => { + let alias_schema = union_schema.replace_qualifier(alias.as_str()); + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(union_plan), + alias: alias.to_string(), + schema: Arc::new(alias_schema), + })) + } + None => Ok(union_plan), + } } /// Project with optional alias diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs index 795f86392724..2b2d216bcc86 100644 --- a/datafusion/core/src/optimizer/filter_push_down.rs +++ b/datafusion/core/src/optimizer/filter_push_down.rs @@ -24,6 +24,7 @@ use crate::logical_plan::{ use crate::logical_plan::{DFSchema, Expr}; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; +use datafusion_expr::logical_plan::SubqueryAlias; use std::collections::{HashMap, HashSet}; /// Filter Push Down optimizer rule pushes filter clauses down the plan @@ -358,11 +359,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // sort is filter-commutable push_down(&state, plan) } - LogicalPlan::Union(Union { - inputs: _, - schema, - alias: _, - }) => { + LogicalPlan::Union(Union { inputs: _, schema }) => { // union changing all qualifiers while building logical plan so we need // to rewrite filters to push unqualified columns to inputs let projection = schema @@ -507,6 +504,64 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { }), ) } + LogicalPlan::SubqueryAlias(SubqueryAlias { alias, input, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // println!("state: {:?}", &state); + + // let t = match &**input { + // LogicalPlan::TableScan(TableScan { table_name, .. }) => { + // Some(table_name.clone()) + // } + // _ => None, + // }; + + let new_filters: Result)>> = state + .filters + .iter() + .map(|pair| { + let mut replace: HashMap<&Column, Column> = HashMap::new(); + let new_columns: HashSet = pair + .1 + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => { + let column = Column { + relation: None, + name: c.name.clone(), + }; + replace.insert(c, column.clone()); + column + } + _ => c.clone(), + }) + .collect(); + let replace_map: HashMap<&Column, &Column> = + replace.iter().map(|(k, v)| (*k, v)).collect(); + let new_expr = replace_col(pair.0.clone(), &replace_map)?; + Ok((new_expr, new_columns)) + }) + .collect(); + + let state = State { + filters: new_filters?, + }; + + println!("state: {:?}", &state); + + let plan = push_down(&state, plan)?; + // let copy = plan.clone(); + // println!("after SubqueryAlias: {:?}", copy); + + // match copy { + // LogicalPlan::SubqueryAlias(SubqueryAlias { input, schema, .. }) => { + // println!("before: {:?}", schema); + // println!("input: {:?}", &*input); + // } + // _ => {} + // }; + Ok(plan) + } _ => { // all other plans are _not_ filter-commutable let used_columns = plan @@ -905,11 +960,12 @@ mod tests { // filter appears below Union without relation qualifier let expected = "\ - Union\ - \n Filter: #a = Int64(1)\ - \n TableScan: test projection=None\ - \n Filter: #a = Int64(1)\ - \n TableScan: test projection=None"; + SubqueryAlias: t\ + \n Union\ + \n Filter: #a = Int64(1)\ + \n TableScan: test projection=None\ + \n Filter: #a = Int64(1)\ + \n TableScan: test projection=None"; assert_optimized_plan_eq(&plan, expected); Ok(()) } diff --git a/datafusion/core/src/optimizer/limit_push_down.rs b/datafusion/core/src/optimizer/limit_push_down.rs index 0c68f1761601..c665eecc5e4e 100644 --- a/datafusion/core/src/optimizer/limit_push_down.rs +++ b/datafusion/core/src/optimizer/limit_push_down.rs @@ -17,6 +17,8 @@ //! Optimizer rule to push down LIMIT in the query plan //! It will push down through projection, limits (taking the smaller limit) +use datafusion_expr::logical_plan::SubqueryAlias; + use super::utils; use crate::error::Result; use crate::execution::context::ExecutionProps; @@ -100,14 +102,7 @@ fn limit_push_down( alias: alias.clone(), })) } - ( - LogicalPlan::Union(Union { - inputs, - alias, - schema, - }), - Some(upper_limit), - ) => { + (LogicalPlan::Union(Union { inputs, schema }), Some(upper_limit)) => { // Push down limit through UNION let new_inputs = inputs .iter() @@ -125,10 +120,29 @@ fn limit_push_down( .collect::>()?; Ok(LogicalPlan::Union(Union { inputs: new_inputs, - alias: alias.clone(), schema: schema.clone(), })) } + ( + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + }), + upper_limit, + ) => { + // Push down limit directly + Ok(LogicalPlan::SubqueryAlias(SubqueryAlias { + input: Arc::new(limit_push_down( + _optimizer, + upper_limit, + input.as_ref(), + _execution_props, + )?), + schema: schema.clone(), + alias: alias.clone(), + })) + } // For other nodes we can't push down the limit // But try to recurse and find other limit nodes to push down _ => { diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs index 9cbec1e908e0..193fbea4ad30 100644 --- a/datafusion/core/src/optimizer/projection_push_down.rs +++ b/datafusion/core/src/optimizer/projection_push_down.rs @@ -32,6 +32,7 @@ use crate::optimizer::utils; use crate::sql::utils::find_sort_exprs; use arrow::datatypes::{Field, Schema}; use arrow::error::Result as ArrowResult; +use datafusion_expr::logical_plan::Filter; use std::{ collections::{BTreeSet, HashSet}, sync::Arc, @@ -383,11 +384,7 @@ fn optimize_plan( schema: a.schema.clone(), })) } - LogicalPlan::Union(Union { - inputs, - schema, - alias, - }) => { + LogicalPlan::Union(Union { inputs, schema }) => { // UNION inputs will reference the same column with different identifiers, so we need // to populate new_required_columns by unqualified column name based on required fields // from the resulting UNION output @@ -429,10 +426,10 @@ fn optimize_plan( Ok(LogicalPlan::Union(Union { inputs: new_inputs, schema: Arc::new(new_schema), - alias: alias.clone(), })) } LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => { + println!("input: {:?}", *input); match input.as_ref() { LogicalPlan::TableScan(TableScan { table_name, .. }) => { let new_required_columns = new_required_columns @@ -455,6 +452,62 @@ fn optimize_plan( let expr = vec![]; utils::from_plan(plan, &expr, &new_inputs) } + LogicalPlan::Union(Union { .. }) => { + // Scope of alias is outside, inside we should remove it. + // Here we remove the alias in `new_required_columns`. + let new_required_columns = new_required_columns + .iter() + .map(|c| match &c.relation { + Some(q) if q == alias => Column { + relation: None, + name: c.name.clone(), + }, + _ => c.clone(), + }) + .collect(); + let new_inputs = vec![optimize_plan( + _optimizer, + input, + &new_required_columns, + has_projection, + _execution_props, + )?]; + let expr = vec![]; + + utils::from_plan(plan, &expr, &new_inputs) + } + LogicalPlan::Filter(Filter { input, .. }) => { + // let new_inputs = vec![optimize_plan( + // _optimizer, + // input, + // &new_required_columns, + // has_projection, + // _execution_props, + // )?]; + // let expr = vec![]; + + // utils::from_plan(plan, &expr, &new_inputs) + let expr = plan.expressions(); + // collect all required columns by this plan + utils::exprlist_to_columns(&expr, &mut new_required_columns)?; + + // apply the optimization to all inputs of the plan + let inputs = plan.inputs(); + let new_inputs = inputs + .iter() + .map(|input_plan| { + optimize_plan( + _optimizer, + input_plan, + &new_required_columns, + has_projection, + _execution_props, + ) + }) + .collect::>>()?; + + utils::from_plan(plan, &expr, &new_inputs) + } _ => Err(DataFusionError::Plan( "SubqueryAlias should only wrap TableScan".to_string(), )), diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs index 0e54fd93ee99..ce1acb77b3e5 100644 --- a/datafusion/core/src/optimizer/utils.rs +++ b/datafusion/core/src/optimizer/utils.rs @@ -268,13 +268,10 @@ pub fn from_plan( LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension { node: e.node.from_template(expr, inputs), })), - LogicalPlan::Union(Union { schema, alias, .. }) => { - Ok(LogicalPlan::Union(Union { - inputs: inputs.to_vec(), - schema: schema.clone(), - alias: alias.clone(), - })) - } + LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union { + inputs: inputs.to_vec(), + schema: schema.clone(), + })), LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 47829ad798ca..a1b948b1db64 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -835,6 +835,12 @@ impl DefaultPhysicalPlanner { LogicalPlan::TableScan(..) => { self.create_initial_plan(input, session_state).await } + LogicalPlan::Union(..) => { + self.create_initial_plan(input, session_state).await + } + LogicalPlan::Filter(..) => { + self.create_initial_plan(input, session_state).await + } _ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string())) } } diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 566baefab7f4..5704b5d83c1a 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -548,6 +548,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .unwrap(); let optimized_logical_schema = plan.schema(); + println!("plan {:?}", plan); + let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); let plan = ctx .create_physical_plan(&plan) @@ -555,6 +557,8 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec .map_err(|e| format!("{:?} at {}", e, msg)) .unwrap(); + println!("plan {:?}", plan); + let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); let task_ctx = ctx.task_ctx(); let results = collect(plan, task_ctx) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ab96dbe732d4..5ff1ae646b15 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1029,8 +1029,6 @@ pub struct Union { pub inputs: Vec, /// Union schema. Should be the same for all inputs. pub schema: DFSchemaRef, - /// Union output relation alias - pub alias: Option, } /// Creates an in memory table.