Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add evaluate_demo and range_analysis_demo to Expr examples #8377

Merged
merged 6 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ cargo run --example csv_sql
- [`dataframe-to-s3.rs`](examples/external_dependency/dataframe-to-s3.rs): Run a query using a DataFrame against a parquet file from s3
- [`dataframe_in_memory.rs`](examples/dataframe_in_memory.rs): Run a query using a DataFrame against data in memory
- [`deserialize_to_struct.rs`](examples/deserialize_to_struct.rs): Convert query results into rust structs using serde
- [`expr_api.rs`](examples/expr_api.rs): Use the `Expr` construction and simplification API
- [`expr_api.rs`](examples/expr_api.rs): Create, execute, simplify and anaylze `Expr`s
- [`flight_sql_server.rs`](examples/flight/flight_sql_server.rs): Run DataFusion as a standalone process and execute SQL queries from JDBC clients
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
Expand Down
144 changes: 134 additions & 10 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,95 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::{BooleanArray, Int32Array};
use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::Operator;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::{ColumnarValue, ExprSchemable, Operator};
use std::sync::Arc;

/// This example demonstrates the DataFusion [`Expr`] API.
///
/// DataFusion comes with a powerful and extensive system for
/// representing and manipulating expressions such as `A + 5` and `X
/// IN ('foo', 'bar', 'baz')` and many other constructs.
/// IN ('foo', 'bar', 'baz')`.
///
/// In addition to building and manipulating [`Expr`]s, DataFusion
/// also comes with APIs for evaluation, simplification, and analysis.
///
/// The code in this example shows how to:
/// 1. Create [`Exprs`] using different APIs: [`main`]`
/// 2. Evaluate [`Exprs`] against data: [`evaluate_demo`]
/// 3. Simplify expressions: [`simplify_demo`]
/// 4. Analyze predicates for boundary ranges: [`range_analysis_demo`]
#[tokio::main]
async fn main() -> Result<()> {
// The easiest way to do create expressions is to use the
// "fluent"-style API, like this:
// "fluent"-style API:
let expr = col("a") + lit(5);

// this creates the same expression as the following though with
// much less code,
// The same same expression can be created directly, with much more code:
let expr2 = Expr::BinaryExpr(BinaryExpr::new(
Box::new(col("a")),
Operator::Plus,
Box::new(Expr::Literal(ScalarValue::Int32(Some(5)))),
));
assert_eq!(expr, expr2);

// See how to evaluate expressions
evaluate_demo()?;

// See how to simplify expressions
simplify_demo()?;

// See how to analyze ranges in expressions
range_analysis_demo()?;

Ok(())
}

/// DataFusion can also evaluate arbitrary expressions on Arrow arrays.
fn evaluate_demo() -> Result<()> {
// For example, let's say you have some integers in an array
let batch = RecordBatch::try_from_iter([(
"a",
Arc::new(Int32Array::from(vec![4, 5, 6, 7, 8, 7, 4])) as _,
)])?;

// If you want to find all rows where the expression `a < 5 OR a = 8` is true
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;

// The result contain an array that is true only for where `a < 5 OR a = 8`
let expected_result = Arc::new(BooleanArray::from(vec![
true, false, false, false, true, false, true,
])) as _;
assert!(
matches!(&result, ColumnarValue::Array(r) if r == &expected_result),
"result: {:?}",
result
);

Ok(())
}

/// In addition to easy construction, DataFusion exposes APIs for
/// working with and simplifying such expressions that call into the
/// same powerful and extensive implementation used for the query
/// engine.
/// In addition to easy construction, DataFusion exposes APIs for simplifying
/// such expression so they are more efficient to evaluate. This code is also
/// used by the query engine to optimize queries.
fn simplify_demo() -> Result<()> {
// For example, lets say you have has created an expression such
// ts = to_timestamp("2020-09-08T12:00:00+00:00")
Expand Down Expand Up @@ -94,7 +145,7 @@ fn simplify_demo() -> Result<()> {
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema);
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
Expand All @@ -120,6 +171,64 @@ fn simplify_demo() -> Result<()> {
col("i").lt(lit(10))
);

// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
);

Ok(())
}

/// DataFusion also has APIs for analyzing predicates (boolean expressions) to
/// determine any ranges restrictions on the inputs required for the predicate
/// evaluate to true.
fn range_analysis_demo() -> Result<()> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really powerful feature of DataFusion and I don't think it is widely understood yet

// For example, let's say you are interested in finding data for all days
// in the month of September, 2020
let september_1 = ScalarValue::Date32(Some(18506)); // 2020-09-01
let october_1 = ScalarValue::Date32(Some(18536)); // 2020-10-01

// The predicate to find all such days could be
// `date > '2020-09-01' AND date < '2020-10-01'`
let expr = col("date")
.gt(lit(september_1.clone()))
.and(col("date").lt(lit(october_1.clone())));

// Using the analysis API, DataFusion can determine that the value of `date`
// must be in the range `['2020-09-01', '2020-10-01']`. If your data is
// organized in files according to day, this information permits skipping
// entire files without reading them.
//
// While this simply example could be handled with a special case, the
alamb marked this conversation as resolved.
Show resolved Hide resolved
// DataFusion API handles arbitrary expressions (so for example, you don't
// have to handle the case where the predicate clauses are reversed such as
// `date < '2020-10-01' AND date > '2020-09-01'`

// As always, we need to tell DataFusion the type of column "date"
let schema = Schema::new(vec![make_field("date", DataType::Date32)]);

// You can provide DataFusion any known boundaries on the values of `date`
// (for example, maybe you know you only have data up to `2020-09-15`), but
// in this case, let's say we don't know any boundaries beforehand so we use
// `try_new_unknown`
let boundaries = ExprBoundaries::try_new_unknown(&schema)?;

// Now, we invoke the analysis code to perform the range analysis
let physical_expr = physical_expr(&schema, expr)?;
let analysis_result =
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;

// The results of the analysis is an range, encoded as an `Interval`, for
// each column in the schema, that must be true in order for the predicate
// to be true.
//
// In this case, we can see that, as expected, `analyze` has figured out
// that in this case, `date` must be in the range `['2020-09-01', '2020-10-01']`
let expected_range = Interval::try_new(september_1, october_1)?;
assert_eq!(analysis_result.boundaries[0].interval, expected_range);

Ok(())
}

Expand All @@ -132,3 +241,18 @@ fn make_ts_field(name: &str) -> Field {
let tz = None;
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

create_physical_expr(&expr, df_schema.as_ref(), schema, &props)
}
10 changes: 9 additions & 1 deletion datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,20 @@
//!
//! ## Plan Representations
//!
//! Logical planning yields [`LogicalPlan`]s nodes and [`Expr`]
//! ### Logical Plans
//! Logical planning yields [`LogicalPlan`] nodes and [`Expr`]
//! expressions which are [`Schema`] aware and represent statements
//! independent of how they are physically executed.
//! A [`LogicalPlan`] is a Directed Acyclic Graph (DAG) of other
//! [`LogicalPlan`]s, each potentially containing embedded [`Expr`]s.
//!
//! Examples of working with and executing `Expr`s can be found in the
//! [`expr_api`.rs] example
//!
//! [`expr_api`.rs]: https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs
//!
//! ### Physical Plans
//!
//! An [`ExecutionPlan`] (sometimes referred to as a "physical plan")
//! is a plan that can be executed against data. It a DAG of other
//! [`ExecutionPlan`]s each potentially containing expressions of the
Expand Down
24 changes: 22 additions & 2 deletions datafusion/physical-expr/src/analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ impl AnalysisContext {
}
}

/// Represents the boundaries of the resulting value from a physical expression,
/// if it were to be an expression, if it were to be evaluated.
/// Represents the boundaries (e.g. min and max values) of a particular column
///
/// This is used range analysis of expressions, to determine if the expression
/// limits the value of particular columns (e.g. analyzing an expression such as
/// `time < 50` would result in a boundary interval for `time` having a max
/// value of `50`).
#[derive(Clone, Debug, PartialEq)]
pub struct ExprBoundaries {
pub column: Column,
Expand Down Expand Up @@ -111,6 +115,22 @@ impl ExprBoundaries {
distinct_count: col_stats.distinct_count.clone(),
})
}

/// Create `ExprBoundaries` that represent no known bounds for all the columns `schema`
pub fn try_new_unknown(schema: &Schema) -> Result<Vec<Self>> {
Copy link
Contributor Author

@alamb alamb Nov 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to make the demo easier to write (I ported it from IOx downstream)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might unbounded be more obvious a name than unknown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might unbounded be more obvious a name than unknown?

I agree -- will change.

schema
.fields()
.iter()
.enumerate()
.map(|(i, field)| {
Ok(Self {
column: Column::new(field.name(), i),
interval: Interval::make_unbounded(field.data_type())?,
distinct_count: Precision::Absent,
})
})
.collect()
}
}

/// Attempts to refine column boundaries and compute a selectivity value.
Expand Down
14 changes: 8 additions & 6 deletions docs/source/library-user-guide/working-with-exprs.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
under the License.
-->

# Working with Exprs
# Working with `Expr`s

<!-- https://github.com/apache/arrow-datafusion/issues/7304 -->

Expand Down Expand Up @@ -48,12 +48,11 @@ As another example, the SQL expression `a + b * c` would be represented as an `E
└────────────────────┘ └────────────────────┘
```

As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.
As the writer of a library, you can use `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF.

There are also executable examples for working with `Expr`s:
## Creating and Evaluating `Expr`s

- [rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs)
- [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs)
Please see [expr_api.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/expr_api.rs) for well commented code for creating, evaluating, simplifying, and analyzing `Expr`s.

## A Scalar UDF Example

Expand All @@ -79,7 +78,10 @@ let expr = add_one_udf.call(vec![col("my_column")]);

If you'd like to learn more about `Expr`s, before we get into the details of creating and rewriting them, you can read the [expression user-guide](./../user-guide/expressions.md).

## Rewriting Exprs
## Rewriting `Expr`s

[rewrite_expr.rs](https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/rewrite_expr.rs) contains example code for rewriting `Expr`s.


Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including:

Expand Down
Loading