From 7aed4d697fa24053d515babfd7678855451c6736 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Mon, 29 Aug 2022 12:01:23 +0300 Subject: [PATCH] Fix propagation of optimized predicates on nested projections (#3228) * Fix propagation of optimized predicates on nested projections * Add SQL integration tests * Alternative implementation on `issue_filters` (#1) --- datafusion/core/tests/sql/projection.rs | 39 ++++++++++++ datafusion/optimizer/src/filter_push_down.rs | 66 ++++++++++++-------- 2 files changed, 79 insertions(+), 26 deletions(-) diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs index 6e59bd42146e..97c6dcf8aa7f 100644 --- a/datafusion/core/tests/sql/projection.rs +++ b/datafusion/core/tests/sql/projection.rs @@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn project_column_with_filters_that_cant_pushed_down_always_false() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select * from (select 1 as a) f where f.a=2;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["++", "++"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} + +#[tokio::test] +async fn project_column_with_filters_that_cant_pushed_down_always_true() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select * from (select 1 as a) f where f.a=1;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} + +#[tokio::test] +async fn project_columns_in_memory_without_propagation() -> Result<()> { + let ctx = SessionContext::new(); + + let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 2;"; + let actual = execute_to_batches(&ctx, sql).await; + + let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"]; + assert_batches_sorted_eq!(expected, &actual); + + Ok(()) +} diff --git a/datafusion/optimizer/src/filter_push_down.rs b/datafusion/optimizer/src/filter_push_down.rs index 2ac5b6e3b0ff..3d0415232dbf 100644 --- a/datafusion/optimizer/src/filter_push_down.rs +++ b/datafusion/optimizer/src/filter_push_down.rs @@ -81,6 +81,7 @@ impl State { } /// returns all predicates in `state` that depend on any of `used_columns` +/// or the ones that does not reference any columns (e.g. WHERE 1=1) fn get_predicates<'a>( state: &'a State, used_columns: &HashSet, @@ -89,10 +90,11 @@ fn get_predicates<'a>( .filters .iter() .filter(|(_, columns)| { - !columns - .intersection(used_columns) - .collect::>() - .is_empty() + columns.is_empty() + || !columns + .intersection(used_columns) + .collect::>() + .is_empty() }) .map(|&(ref a, ref b)| (a, b)) .unzip() @@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { let mut predicates = vec![]; utils::split_conjunction(predicate, &mut predicates); - // Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.) - let mut no_col_predicates = vec![]; - predicates .into_iter() .try_for_each::<_, Result<()>>(|predicate| { let mut columns: HashSet = HashSet::new(); expr_to_columns(predicate, &mut columns)?; - if columns.is_empty() { - no_col_predicates.push(predicate) - } else { - // collect the predicate - state.filters.push((predicate.clone(), columns)); - } + state.filters.push((predicate.clone(), columns)); Ok(()) })?; - // Predicates without columns will not be pushed down. - // As those contain only literals, they could be optimized using constant folding - // and removal of WHERE TRUE / WHERE FALSE - if !no_col_predicates.is_empty() { - Ok(utils::add_filter( - optimize(input, state)?, - &no_col_predicates, - )) - } else { - optimize(input, state) - } + optimize(input, state) } LogicalPlan::Projection(Projection { input, @@ -401,8 +385,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result { // optimize inner let new_input = optimize(input, state)?; - - from_plan(plan, expr, &[new_input]) + Ok(from_plan(plan, expr, &[new_input])?) } LogicalPlan::Aggregate(Aggregate { aggr_expr, input, .. @@ -2092,4 +2075,35 @@ mod tests { Ok(()) } + + #[test] + fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> { + // SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1 + let plan = LogicalPlanBuilder::empty(true) + .project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))? + .project_with_alias(vec![col("b.a")], Some("b".to_owned()))? + .filter(col("b.a").eq(lit(1i64)))? + .project(vec![col("b.a")])? + .build()?; + + let expected_before = "\ + Projection: #b.a\ + \n Filter: #b.a = Int64(1)\ + \n Projection: #b.a, alias=b\ + \n Projection: Int64(0) AS a, alias=b\ + \n EmptyRelation"; + assert_eq!(format!("{:?}", plan), expected_before); + + // Ensure that the predicate without any columns (0 = 1) is + // still there. + let expected_after = "\ + Projection: #b.a\ + \n Projection: #b.a, alias=b\ + \n Projection: Int64(0) AS a, alias=b\ + \n Filter: Int64(0) = Int64(1)\ + \n EmptyRelation"; + assert_optimized_plan_eq(&plan, expected_after); + + Ok(()) + } }