Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Improve documentation for Filter Pushdown #8023

Merged
merged 4 commits into from
Nov 2, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 82 additions & 19 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// specific language governing permissions and limitations
// under the License.

//! Push Down Filter optimizer rule ensures that filters are applied as early as possible in the plan
//! [`PushDownFilter`] Moves filters so they are applied as early as possible in
//! the plan.

use crate::optimizer::ApplyOrder;
use crate::utils::{conjunction, split_conjunction, split_conjunction_owned};
Expand All @@ -33,31 +34,93 @@ use itertools::Itertools;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

/// Push Down Filter optimizer rule pushes filter clauses down the plan
/// Optimizer rule for pushing (moving) filter expressions down in a plan so
/// they are applied as early as possible.
///
/// # Introduction
/// A filter-commutative operation is an operation whose result of filter(op(data)) = op(filter(data)).
/// An example of a filter-commutative operation is a projection; a counter-example is `limit`.
///
/// The filter-commutative property is column-specific. An aggregate grouped by A on SUM(B)
/// can commute with a filter that depends on A only, but does not commute with a filter that depends
/// on SUM(B).
/// The goal of this rule is to improve query performance by eliminating
/// redundant work.
///
/// For example, given a plan that sorts all values where `a > 10`:
///
/// ```text
/// Filter (a > 10)
/// Sort (a, b)
/// ```
///
/// A better plan is to filter the data *before* the Sort, which sorts fewer
/// rows and therefore does less work overall:
///
/// ```text
/// Sort (a, b)
/// Filter (a > 10) <-- Filter is moved before the sort
/// ```
///
/// However it is not always possible to push filters down. For example, given a
/// plan that finds the top 3 values and then keeps only those that are greater
/// than 10, if the filter is pushed below the limit it would produce a
/// different result.
///
/// ```text
/// Filter (a > 10) <-- can not move this Filter before the limit
/// Limit (fetch=3)
/// Sort (a, b)
/// ```
///
///
/// More formally, a filter-commutative operation is an operation `op` that
/// satisfies `filter(op(data)) = op(filter(data))`.
///
/// The filter-commutative property is plan and column-specific. A filter on `a`
/// can be pushed through a `Aggregate(group_by = [a], agg=[SUM(b))`. However, a
/// filter on `SUM(b)` can not be pushed through the same aggregate.
///
/// # Handling Conjunctions
///
/// It is possible to only push down **part** of a filter expression if is
/// connected with `AND`s (more formally if it is a "conjunction").
///
/// For example, given the following plan:
///
/// ```text
/// Filter(a > 10 AND SUM(b) < 5)
/// Aggregate(group_by = [a], agg = [SUM(b))
/// ```
///
/// The `a > 10` is commutative with the `Aggregate` but `SUM(b) < 5` is not.
/// Therefore it is possible to only push part of the expression, resulting in:
///
/// ```text
/// Filter(SUM(b) < 5)
/// Aggregate(group_by = [a], agg = [SUM(b))
/// Filter(a > 10)
/// ```
///
/// # Handling Column Aliases
///
/// This optimizer commutes filters with filter-commutative operations to push the filters
/// the closest possible to the scans, re-writing the filter expressions by every
/// projection that changes the filter's expression.
/// This optimizer must sometimes handle re-writing filter expressions when they
/// pushed, for example if there is a projection that aliases `a+1` to `"b"`:
///
/// Filter: b Gt Int64(10)
/// Projection: a AS b
/// ```text
/// Filter (b > 10)
/// Projection: [a+1 AS "b"] <-- changes the name of `a+1` to `b`
/// ```
///
/// is optimized to
/// To apply the filter prior to the `Projection`, all references to `b` must be
/// rewritten to `a+1`:
///
/// Projection: a AS b
/// Filter: a Gt Int64(10) <--- changed from b to a
/// ```text
/// Projection: a AS "b"
/// Filter: (a + 1 > 10) <--- changed from b to a + 1
/// ```
/// # Implementation Notes
///
/// This performs a single pass through the plan. When it passes through a filter, it stores that filter,
/// and when it reaches a node that does not commute with it, it adds the filter to that place.
/// When it passes through a projection, it re-writes the filter's expression taking into account that projection.
/// When multiple filters would have been written, it `AND` their expressions into a single expression.
/// This implementation performs a single pass through the plan, "pushing" down
/// filters. When it passes through a filter, it stores that filter, and when it
/// reaches a plan node that does not commute with that filter, it adds the
/// filter to that place. When it passes through a projection, it re-writes the
/// filter's expression taking into account that projection.
#[derive(Default)]
pub struct PushDownFilter {}

Expand Down