Skip to content

Commit

Permalink
Add evaluate_demo and range_analysis_demo to Expr examples (#8377)
Browse files Browse the repository at this point in the history
* Add `evaluate_demo` and `range_analysis_demo` to Expr examples

* Prettier

* Update datafusion-examples/examples/expr_api.rs

Co-authored-by: Raphael Taylor-Davies <[email protected]>

* rename ExprBoundaries::try_new_unknown --> ExprBoundaries::try_new_unbounded

---------

Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
alamb and tustvold authored Dec 8, 2023
1 parent 34b0445 commit 91cc573
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 20 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cargo run --example csv_sql
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
Expand Down
144 changes: 134 additions & 10 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,95 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::Operator;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use std::sync::Arc;

/// This example demonstrates the DataFusion [`Expr`] API.
///
/// DataFusion comes with a powerful and extensive system for
/// representing and manipulating expressions such as `A + 5` and `X
/// IN ('foo', 'bar', 'baz')` and many other constructs.
/// IN ('foo', 'bar', 'baz')`.
///
/// In addition to building and manipulating [`Expr`]s, DataFusion
/// also comes with APIs for evaluation, simplification, and analysis.
///
/// The code in this example shows how to:
/// 1. Create [`Exprs`] using different APIs: [`main`]`
/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
/// 3. Simplify expressions: [`simplify_demo`]
/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
// "fluent"-style API, like this:
// "fluent"-style API:
let expr = col("a") + lit(5);

// this creates the same expression as the following though with
// much less code,
// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
));
assert_eq!(expr, expr2);

// See how to evaluate expressions
evaluate_demo()?;

// See how to simplify expressions
simplify_demo()?;

// See how to analyze ranges in expressions
range_analysis_demo()?;

Ok(())
}

/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
fn evaluate_demo() -> Result<()> {
// For example, let's say you have some integers in an array
let batch = RecordBatch::try_from_iter([(
"a",
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _,
)])?;

// If you want to find all rows where the expression `a < 5 OR a = 8` is true
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;

// The result contain an array that is true only for where `a < 5 OR a = 8`
let expected_result = Arc::new(BooleanArray::from(vec![
true, false, false, false, true, false, true,
])) as _;
assert!(
matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
"result: {:?}",
result
);

Ok(())
}

/// In addition to easy construction, DataFusion exposes APIs for
/// working with and simplifying such expressions that call into the
/// same powerful and extensive implementation used for the query
/// engine.
/// In addition to easy construction, DataFusion exposes APIs for simplifying
/// such expression so they are more efficient to evaluate. This code is also
/// used by the query engine to optimize queries.
fn simplify_demo() -> Result<()> {
// For example, lets say you have has created an expression such
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
Expand Down Expand Up @@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema);
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
Expand All @@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
col("i").lt(lit(10))
);

// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Ok(())
}

/// DataFusion also has APIs for analyzing predicates (boolean expressions) to
/// determine any ranges restrictions on the inputs required for the predicate
/// evaluate to true.
fn range_analysis_demo() -> Result<()> {
// For example, let's say you are interested in finding data for all days
// in the month of September, 2020
let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01
let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01

// The predicate to find all such days could be
// `date > '2020-09-01' AND date < '2020-10-01'`
let expr = col("date")
.gt(lit(september_1.clone()))
.and(col("date").lt(lit(october_1.clone())));

// Using the analysis API, DataFusion can determine that the value of `date`
// must be in the range `['2020-09-01', '2020-10-01']`. If your data is
// organized in files according to day, this information permits skipping
// entire files without reading them.
//
// While this simple example could be handled with a special case, the
// DataFusion API handles arbitrary expressions (so for example, you don't
// have to handle the case where the predicate clauses are reversed such as
// `date < '2020-10-01' AND date > '2020-09-01'`

// As always, we need to tell DataFusion the type of column "date"
let schema = Schema::new(vec![make_field("date", DataType::Date32)]);

// You can provide DataFusion any known boundaries on the values of `date`
// (for example, maybe you know you only have data up to `2020-09-15`), but
// in this case, let's say we don't know any boundaries beforehand so we use
// `try_new_unknown`
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;

// Now, we invoke the analysis code to perform the range analysis
let physical_expr = physical_expr(&schema, expr)?;
let analysis_result =
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;

// The results of the analysis is an range, encoded as an `Interval`, for
// each column in the schema, that must be true in order for the predicate
// to be true.
//
// In this case, we can see that, as expected, `analyze` has figured out
// that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
let expected_range = Interval::try_new(september_1, october_1)?;
assert_eq!(analysis_result.boundaries[0].interval, expected_range);

Ok(())
}

Expand All @@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
let tz = None;
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
}
10 changes: 9 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,20 @@
//!
//! ## Plan Representations
//!
//! Logical planning yields [`LogicalPlan`]s nodes and [`Expr`]
//! ### Logical Plans
//! Logical planning yields [`LogicalPlan`] nodes and [`Expr`]
//! expressions which are [`Schema`] aware and represent statements
//! independent of how they are physically executed.
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
//! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
//!
//! Examples of working with and executing `Expr`s can be found in the
//! [`expr_api`.rs] example
//!
//! [`expr_api`.rs]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs
//!
//! ### Physical Plans
//!
//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
//! is a plan that can be executed against data. It a DAG of other
//! [`ExecutionPlan`]s each potentially containing expressions of the
Expand Down
25 changes: 23 additions & 2 deletions datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ impl AnalysisContext {
}
}

/// Represents the boundaries of the resulting value from a physical expression,
/// if it were to be an expression, if it were to be evaluated.
/// Represents the boundaries (e.g. min and max values) of a particular column
///
/// This is used range analysis of expressions, to determine if the expression
/// limits the value of particular columns (e.g. analyzing an expression such as
/// `time < 50` would result in a boundary interval for `time` having a max
/// value of `50`).
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
Expand Down Expand Up @@ -111,6 +115,23 @@ impl ExprBoundaries {
distinct_count: col_stats.distinct_count.clone(),
})
}

/// Create `ExprBoundaries` that represent no known bounds for all the
/// columns in `schema`
pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
Ok(Self {
column: Column::new(field.name(), i),
interval: Interval::make_unbounded(field.data_type())?,
distinct_count: Precision::Absent,
})
})
.collect()
}
}

/// Attempts to refine column boundaries and compute a selectivity value.
Expand Down
13 changes: 7 additions & 6 deletions docs/source/library-user-guide/working-with-exprs.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
under the License.
-->

# Working with Exprs
# Working with `Expr`s

<!-- https://github.com/apache/arrow-datafusion/issues/7304 -->

Expand Down Expand Up @@ -48,12 +48,11 @@ As another example, the SQL expression `a + b * c` would be represented as an `E
└────────────────────┘ └────────────────────┘
```

As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
As the writer of a library, you can use `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.

There are also executable examples for working with `Expr`s:
## Creating and Evaluating `Expr`s

- [rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs)
- [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs)
Please see [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs) for well commented code for creating, evaluating, simplifying, and analyzing `Expr`s.

## A Scalar UDF Example

Expand All @@ -79,7 +78,9 @@ let expr = add_one_udf.call(vec![col("my_column")]);

If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md).

## Rewriting Exprs
## Rewriting `Expr`s

[rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs) contains example code for rewriting `Expr`s.

Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including:

Expand Down

0 comments on commit 91cc573

Please sign in to comment.