Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Replace Union.alias with SubqueryAlias #2398

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions datafusion/core/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,16 +1088,33 @@ pub fn union_with_alias(
}

let union_schema = (**inputs[0].schema()).clone();
let union_schema = Arc::new(match alias {
Some(ref alias) => union_schema.replace_qualifier(alias.as_str()),
None => union_schema.strip_qualifiers(),
});
let union_schema_strip = union_schema.clone().strip_qualifiers();

inputs
.iter()
.skip(1)
.try_for_each(|input_plan| -> Result<()> {
union_schema_strip.check_arrow_schema_type_compatible(
&((**input_plan.schema()).clone().into()),
)
})?;

Ok(LogicalPlan::Union(Union {
let union_plan = LogicalPlan::Union(Union {
inputs,
schema: union_schema,
alias,
}))
schema: Arc::new(union_schema_strip),
});

match alias {
Some(ref alias) => {
let alias_schema = union_schema.replace_qualifier(alias.as_str());
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(union_plan),
alias: alias.to_string(),
schema: Arc::new(alias_schema),
}))
}
None => Ok(union_plan),
}
}

/// Project with optional alias
Expand Down
76 changes: 66 additions & 10 deletions datafusion/core/src/optimizer/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::logical_plan::{
use crate::logical_plan::{DFSchema, Expr};
use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::utils;
use datafusion_expr::logical_plan::SubqueryAlias;
use std::collections::{HashMap, HashSet};

/// Filter Push Down optimizer rule pushes filter clauses down the plan
Expand Down Expand Up @@ -358,11 +359,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
// sort is filter-commutable
push_down(&state, plan)
}
LogicalPlan::Union(Union {
inputs: _,
schema,
alias: _,
}) => {
LogicalPlan::Union(Union { inputs: _, schema }) => {
// union changing all qualifiers while building logical plan so we need
// to rewrite filters to push unqualified columns to inputs
let projection = schema
Expand Down Expand Up @@ -507,6 +504,64 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
}),
)
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, input, .. }) => {
// println!("before: {:?}", schema);
// println!("input: {:?}", &*input);
// println!("state: {:?}", &state);

// let t = match &**input {
// LogicalPlan::TableScan(TableScan { table_name, .. }) => {
// Some(table_name.clone())
// }
// _ => None,
// };

let new_filters: Result<Vec<(Expr, HashSet<Column>)>> = state
.filters
.iter()
.map(|pair| {
let mut replace: HashMap<&Column, Column> = HashMap::new();
let new_columns: HashSet<Column> = pair
.1
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => {
let column = Column {
relation: None,
name: c.name.clone(),
};
replace.insert(c, column.clone());
column
}
_ => c.clone(),
})
.collect();
let replace_map: HashMap<&Column, &Column> =
replace.iter().map(|(k, v)| (*k, v)).collect();
let new_expr = replace_col(pair.0.clone(), &replace_map)?;
Ok((new_expr, new_columns))
})
.collect();

let state = State {
filters: new_filters?,
};

println!("state: {:?}", &state);

let plan = push_down(&state, plan)?;
// let copy = plan.clone();
// println!("after SubqueryAlias: {:?}", copy);

// match copy {
// LogicalPlan::SubqueryAlias(SubqueryAlias { input, schema, .. }) => {
// println!("before: {:?}", schema);
// println!("input: {:?}", &*input);
// }
// _ => {}
// };
Ok(plan)
}
_ => {
// all other plans are _not_ filter-commutable
let used_columns = plan
Expand Down Expand Up @@ -905,11 +960,12 @@ mod tests {

// filter appears below Union without relation qualifier
let expected = "\
Union\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None";
SubqueryAlias: t\
\n Union\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None\
\n Filter: #a = Int64(1)\
\n TableScan: test projection=None";
assert_optimized_plan_eq(&plan, expected);
Ok(())
}
Expand Down
32 changes: 23 additions & 9 deletions datafusion/core/src/optimizer/limit_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
use datafusion_expr::logical_plan::SubqueryAlias;

use super::utils;
use crate::error::Result;
use crate::execution::context::ExecutionProps;
Expand Down Expand Up @@ -100,14 +102,7 @@ fn limit_push_down(
alias: alias.clone(),
}))
}
(
LogicalPlan::Union(Union {
inputs,
alias,
schema,
}),
Some(upper_limit),
) => {
(LogicalPlan::Union(Union { inputs, schema }), Some(upper_limit)) => {
// Push down limit through UNION
let new_inputs = inputs
.iter()
Expand All @@ -125,10 +120,29 @@ fn limit_push_down(
.collect::<Result<_>>()?;
Ok(LogicalPlan::Union(Union {
inputs: new_inputs,
alias: alias.clone(),
schema: schema.clone(),
}))
}
(
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
alias,
schema,
}),
upper_limit,
) => {
// Push down limit directly
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(limit_push_down(
_optimizer,
upper_limit,
input.as_ref(),
_execution_props,
)?),
schema: schema.clone(),
alias: alias.clone(),
}))
}
// For other nodes we can't push down the limit
// But try to recurse and find other limit nodes to push down
_ => {
Expand Down
65 changes: 59 additions & 6 deletions datafusion/core/src/optimizer/projection_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::optimizer::utils;
use crate::sql::utils::find_sort_exprs;
use arrow::datatypes::{Field, Schema};
use arrow::error::Result as ArrowResult;
use datafusion_expr::logical_plan::Filter;
use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
Expand Down Expand Up @@ -383,11 +384,7 @@ fn optimize_plan(
schema: a.schema.clone(),
}))
}
LogicalPlan::Union(Union {
inputs,
schema,
alias,
}) => {
LogicalPlan::Union(Union { inputs, schema }) => {
// UNION inputs will reference the same column with different identifiers, so we need
// to populate new_required_columns by unqualified column name based on required fields
// from the resulting UNION output
Expand Down Expand Up @@ -429,10 +426,10 @@ fn optimize_plan(
Ok(LogicalPlan::Union(Union {
inputs: new_inputs,
schema: Arc::new(new_schema),
alias: alias.clone(),
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
println!("input: {:?}", *input);
match input.as_ref() {
LogicalPlan::TableScan(TableScan { table_name, .. }) => {
let new_required_columns = new_required_columns
Expand All @@ -455,6 +452,62 @@ fn optimize_plan(
let expr = vec![];
utils::from_plan(plan, &expr, &new_inputs)
}
LogicalPlan::Union(Union { .. }) => {
// Scope of alias is outside, inside we should remove it.
// Here we remove the alias in `new_required_columns`.
let new_required_columns = new_required_columns
.iter()
.map(|c| match &c.relation {
Some(q) if q == alias => Column {
relation: None,
name: c.name.clone(),
},
_ => c.clone(),
})
.collect();
let new_inputs = vec![optimize_plan(
_optimizer,
input,
&new_required_columns,
has_projection,
_execution_props,
)?];
let expr = vec![];

utils::from_plan(plan, &expr, &new_inputs)
}
LogicalPlan::Filter(Filter { input, .. }) => {
// let new_inputs = vec![optimize_plan(
// _optimizer,
// input,
// &new_required_columns,
// has_projection,
// _execution_props,
// )?];
// let expr = vec![];

// utils::from_plan(plan, &expr, &new_inputs)
let expr = plan.expressions();
// collect all required columns by this plan
utils::exprlist_to_columns(&expr, &mut new_required_columns)?;

// apply the optimization to all inputs of the plan
let inputs = plan.inputs();
let new_inputs = inputs
.iter()
.map(|input_plan| {
optimize_plan(
_optimizer,
input_plan,
&new_required_columns,
has_projection,
_execution_props,
)
})
.collect::<Result<Vec<_>>>()?;

utils::from_plan(plan, &expr, &new_inputs)
}
_ => Err(DataFusionError::Plan(
"SubqueryAlias should only wrap TableScan".to_string(),
)),
Expand Down
11 changes: 4 additions & 7 deletions datafusion/core/src/optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,10 @@ pub fn from_plan(
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(expr, inputs),
})),
LogicalPlan::Union(Union { schema, alias, .. }) => {
Ok(LogicalPlan::Union(Union {
inputs: inputs.to_vec(),
schema: schema.clone(),
alias: alias.clone(),
}))
}
LogicalPlan::Union(Union { schema, .. }) => Ok(LogicalPlan::Union(Union {
inputs: inputs.to_vec(),
schema: schema.clone(),
})),
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,12 @@ impl DefaultPhysicalPlanner {
LogicalPlan::TableScan(..) => {
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Union(..) => {
self.create_initial_plan(input, session_state).await
}
LogicalPlan::Filter(..) => {
self.create_initial_plan(input, session_state).await
}
_ => Err(DataFusionError::Plan("SubqueryAlias should only wrap TableScan".to_string()))
}
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,13 +548,17 @@ async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch>
.unwrap();
let optimized_logical_schema = plan.schema();

println!("plan {:?}", plan);

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx
.create_physical_plan(&plan)
.await
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();

println!("plan {:?}", plan);

let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx)
Expand Down
2 changes: 0 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,6 @@ pub struct Union {
pub inputs: Vec<LogicalPlan>,
/// Union schema. Should be the same for all inputs.
pub schema: DFSchemaRef,
/// Union output relation alias
pub alias: Option<String>,
}

/// Creates an in memory table.
Expand Down