diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md index 5b1a5e24853c..9646cee45e7a 100644 --- a/datafusion-examples/README.md +++ b/datafusion-examples/README.md @@ -56,6 +56,7 @@ cargo run --example csv_sql - [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients - [`make_date.rs`](examples/make_date.rs): Examples of using the make_date function - [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es +- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics - [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file - [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files - [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3 diff --git a/datafusion-examples/examples/pruning.rs b/datafusion-examples/examples/pruning.rs new file mode 100644 index 000000000000..21e62626be7d --- /dev/null +++ b/datafusion-examples/examples/pruning.rs @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ArrayRef, BooleanArray, Int32Array}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::common::{DFSchema, ScalarValue}; +use datafusion::execution::context::ExecutionProps; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; +use datafusion::prelude::*; +use std::collections::HashSet; +use std::sync::Arc; + +/// This example shows how to use DataFusion's `PruningPredicate` to prove +/// filter expressions can never be true based on statistics such as min/max +/// values of columns. +/// +/// The process is called "pruning" and is commonly used in query engines to +/// quickly eliminate entire files / partitions / row groups of data from +/// consideration using statistical information from a catalog or other +/// metadata. +#[tokio::main] +async fn main() { + // In this example, we'll use the PruningPredicate to determine if + // the expression `x = 5 AND y = 10` can never be true based on statistics + + // Start with the expression `x = 5 AND y = 10` + let expr = col("x").eq(lit(5)).and(col("y").eq(lit(10))); + + // We can analyze this predicate using information provided by the + // `PruningStatistics` trait, in this case we'll use a simple catalog that + // models three files. For all rows in each file: + // + // File 1: x has values between `4` and `6` + // y has the value 10 + // + // File 2: x has values between `4` and `6` + // y has the value of `7` + // + // File 3: x has the value 1 + // nothing is known about the value of y + let my_catalog = MyCatalog::new(); + + // Create a `PruningPredicate`. + // + // Note the predicate does not automatically coerce types or simplify + // expressions. See expr_api.rs examples for how to do this if required + let predicate = create_pruning_predicate(expr, &my_catalog.schema); + + // Evaluate the predicate for the three files in the catalog + let prune_results = predicate.prune(&my_catalog).unwrap(); + println!("Pruning results: {prune_results:?}"); + + // The result is a `Vec` of bool values, one for each file in the catalog + assert_eq!( + prune_results, + vec![ + // File 1: `x = 5 AND y = 10` can evaluate to true if x has values + // between `4` and `6`, y has the value `10`, so the file can not be + // skipped + // + // NOTE this doesn't mean there actually are rows that evaluate to + // true, but the pruning predicate can't prove there aren't any. + true, + // File 2: `x = 5 AND y = 10` can never evaluate to true because y + // has only the value of 7. Thus this file can be skipped. + false, + // File 3: `x = 5 AND y = 10` can never evaluate to true because x + // has the value `1`, and for any value of `y` the expression will + // evaluate to false (`x = 5 AND y = 10 -->` false AND null` --> `false`). Thus this file can also be + // skipped. + false + ] + ); +} + +/// A simple model catalog that has information about the three files that store +/// data for a table with two columns (x and y). +struct MyCatalog { + schema: SchemaRef, + // (min, max) for x + x_values: Vec<(Option, Option)>, + // (min, max) for y + y_values: Vec<(Option, Option)>, +} +impl MyCatalog { + fn new() -> Self { + MyCatalog { + schema: Arc::new(Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ])), + x_values: vec![ + // File 1: x has values between `4` and `6` + (Some(4), Some(6)), + // File 2: x has values between `4` and `6` + (Some(4), Some(6)), + // File 3: x has the value 1 + (Some(1), Some(1)), + ], + y_values: vec![ + // File 1: y has the value 10 + (Some(10), Some(10)), + // File 2: y has the value of `7` + (Some(7), Some(7)), + // File 3: nothing is known about the value of y. This is + // represented as (None, None). + // + // Note, returning null means the value isn't known, NOT + // that we know the entire column is null. + (None, None), + ], + } + } +} + +/// We communicate the statistical information to DataFusion by implementing the +/// PruningStatistics trait. +impl PruningStatistics for MyCatalog { + fn num_containers(&self) -> usize { + // there are 3 files in this "catalog", and thus each array returned + // from min_values and max_values also has 3 elements + 3 + } + + fn min_values(&self, column: &Column) -> Option { + // The pruning predicate evaluates the bounds for multiple expressions + // at once, so return an array with an element for the minimum value in + // each file + match column.name.as_str() { + "x" => Some(i32_array(self.x_values.iter().map(|(min, _)| min))), + "y" => Some(i32_array(self.y_values.iter().map(|(min, _)| min))), + name => panic!("unknown column name: {name}"), + } + } + + fn max_values(&self, column: &Column) -> Option { + // similarly to min_values, return an array with an element for the + // maximum value in each file + match column.name.as_str() { + "x" => Some(i32_array(self.x_values.iter().map(|(_, max)| max))), + "y" => Some(i32_array(self.y_values.iter().map(|(_, max)| max))), + name => panic!("unknown column name: {name}"), + } + } + + fn null_counts(&self, _column: &Column) -> Option { + // In this example, we know nothing about the number of nulls + None + } + + fn contained( + &self, + _column: &Column, + _values: &HashSet, + ) -> Option { + // this method can be used to implement Bloom filter like filtering + // but we do not illustrate that here + None + } +} + +fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate { + let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap(); + let props = ExecutionProps::new(); + let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap(); + PruningPredicate::try_new(physical_expr, schema.clone()).unwrap() +} + +fn i32_array<'a>(values: impl Iterator>) -> ArrayRef { + Arc::new(Int32Array::from_iter(values.cloned())) +} diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index aa0c26723767..ceb9e598f63d 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -136,6 +136,8 @@ pub trait PruningStatistics { /// possibly evaluate to `true` given information about a column provided by /// [`PruningStatistics`]. /// +/// # Introduction +/// /// `PruningPredicate` analyzes filter expressions using statistics such as /// min/max values and null counts, attempting to prove a "container" (e.g. /// Parquet Row Group) can be skipped without reading the actual data, @@ -163,6 +165,12 @@ pub trait PruningStatistics { /// /// # Example /// +/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete +/// example of how to use `PruningPredicate` to prune files based on min/max +/// values. +/// +/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/pruning.rs +/// /// Given an expression like `x = 5` and statistics for 3 containers (Row /// Groups, files, etc) `A`, `B`, and `C`: ///