Skip to content

Commit

Permalink
Fix propagation of optimized predicates on nested projections (apache…
Browse files Browse the repository at this point in the history
…#3228)

* Fix propagation of optimized predicates on nested projections

* Add SQL integration tests

* Alternative implementation on `issue_filters` (#1)
  • Loading branch information
isidentical authored Aug 29, 2022
1 parent 873b071 commit 7aed4d6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 26 deletions.
39 changes: 39 additions & 0 deletions datafusion/core/tests/sql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,42 @@ async fn project_column_with_same_name_as_relation() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn project_column_with_filters_that_cant_pushed_down_always_false() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select * from (select 1 as a) f where f.a=2;";
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec!["++", "++"];
assert_batches_sorted_eq!(expected, &actual);

Ok(())
}

#[tokio::test]
async fn project_column_with_filters_that_cant_pushed_down_always_true() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select * from (select 1 as a) f where f.a=1;";
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec!["+---+", "| a |", "+---+", "| 1 |", "+---+"];
assert_batches_sorted_eq!(expected, &actual);

Ok(())
}

#[tokio::test]
async fn project_columns_in_memory_without_propagation() -> Result<()> {
let ctx = SessionContext::new();

let sql = "select column1 as a from (values (1), (2)) f where f.column1 = 2;";
let actual = execute_to_batches(&ctx, sql).await;

let expected = vec!["+---+", "| a |", "+---+", "| 2 |", "+---+"];
assert_batches_sorted_eq!(expected, &actual);

Ok(())
}
66 changes: 40 additions & 26 deletions datafusion/optimizer/src/filter_push_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl State {
}

/// returns all predicates in `state` that depend on any of `used_columns`
/// or the ones that does not reference any columns (e.g. WHERE 1=1)
fn get_predicates<'a>(
state: &'a State,
used_columns: &HashSet<Column>,
Expand All @@ -89,10 +90,11 @@ fn get_predicates<'a>(
.filters
.iter()
.filter(|(_, columns)| {
!columns
.intersection(used_columns)
.collect::<HashSet<_>>()
.is_empty()
columns.is_empty()
|| !columns
.intersection(used_columns)
.collect::<HashSet<_>>()
.is_empty()
})
.map(|&(ref a, ref b)| (a, b))
.unzip()
Expand Down Expand Up @@ -338,34 +340,16 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
let mut predicates = vec![];
utils::split_conjunction(predicate, &mut predicates);

// Predicates without referencing columns (WHERE FALSE, WHERE 1=1, etc.)
let mut no_col_predicates = vec![];

predicates
.into_iter()
.try_for_each::<_, Result<()>>(|predicate| {
let mut columns: HashSet<Column> = HashSet::new();
expr_to_columns(predicate, &mut columns)?;
if columns.is_empty() {
no_col_predicates.push(predicate)
} else {
// collect the predicate
state.filters.push((predicate.clone(), columns));
}
state.filters.push((predicate.clone(), columns));
Ok(())
})?;

// Predicates without columns will not be pushed down.
// As those contain only literals, they could be optimized using constant folding
// and removal of WHERE TRUE / WHERE FALSE
if !no_col_predicates.is_empty() {
Ok(utils::add_filter(
optimize(input, state)?,
&no_col_predicates,
))
} else {
optimize(input, state)
}
optimize(input, state)
}
LogicalPlan::Projection(Projection {
input,
Expand Down Expand Up @@ -401,8 +385,7 @@ fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {

// optimize inner
let new_input = optimize(input, state)?;

from_plan(plan, expr, &[new_input])
Ok(from_plan(plan, expr, &[new_input])?)
}
LogicalPlan::Aggregate(Aggregate {
aggr_expr, input, ..
Expand Down Expand Up @@ -2092,4 +2075,35 @@ mod tests {

Ok(())
}

#[test]
fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> {
// SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
let plan = LogicalPlanBuilder::empty(true)
.project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))?
.project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
.filter(col("b.a").eq(lit(1i64)))?
.project(vec![col("b.a")])?
.build()?;

let expected_before = "\
Projection: #b.a\
\n Filter: #b.a = Int64(1)\
\n Projection: #b.a, alias=b\
\n Projection: Int64(0) AS a, alias=b\
\n EmptyRelation";
assert_eq!(format!("{:?}", plan), expected_before);

// Ensure that the predicate without any columns (0 = 1) is
// still there.
let expected_after = "\
Projection: #b.a\
\n Projection: #b.a, alias=b\
\n Projection: Int64(0) AS a, alias=b\
\n Filter: Int64(0) = Int64(1)\
\n EmptyRelation";
assert_optimized_plan_eq(&plan, expected_after);

Ok(())
}
}

0 comments on commit 7aed4d6

Please sign in to comment.