Skip to content

Commit

Permalink
Migrate to_timestamp* functions to new functions crate.
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Feb 28, 2024
1 parent d8e6016 commit 78baa96
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 236 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Default features:
- `array_expressions`: functions for working with arrays such as `array_to_string`
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
- `datetime_expressions`: date and time functions such as `to_timestamp`
- `encoding_expressions`: `encode` and `decode` functions
- `parquet`: support for reading the [Apache Parquet] format
- `regex_expressions`: regular expression functions, such as `regexp_match`
Expand Down
46 changes: 44 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "36.0.0", features = [
"avro",
"crypto_expressions",
"datetime_expressions",
"encoding_expressions",
"parquet",
"regex_expressions",
Expand Down
118 changes: 116 additions & 2 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@
//! This program demonstrates the DataFusion expression simplification API.
use arrow::datatypes::{DataType, Field, Schema};
use chrono::{DateTime, TimeZone, Utc};
use datafusion::common::DFSchema;
use datafusion::{error::Result, execution::context::ExecutionProps, prelude::*};
use datafusion_expr::{Expr, ExprSchemable};
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyInfo};
use datafusion_common::ScalarValue;
use datafusion_expr::{
table_scan, Cast, Expr, ExprSchemable, LogicalPlan, LogicalPlanBuilder,
};
use datafusion_optimizer::simplify_expressions::{
ExprSimplifier, SimplifyExpressions, SimplifyInfo,
};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};

/// In order to simplify expressions, DataFusion must have information
/// about the expressions.
Expand Down Expand Up @@ -79,6 +86,43 @@ fn schema() -> DFSchema {
.unwrap()
}

fn test_table_scan() -> LogicalPlan {
let schema = Schema::new(vec![
Field::new("a", DataType::Boolean, false),
Field::new("b", DataType::Boolean, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::UInt32, false),
Field::new("e", DataType::UInt32, true),
]);
table_scan(Some("test"), &schema, None)
.expect("creating scan")
.build()
.expect("building plan")
}

fn get_optimized_plan_formatted(plan: &LogicalPlan, date_time: &DateTime<Utc>) -> String {
let config = OptimizerContext::new().with_query_execution_start_time(*date_time);
let rule = SimplifyExpressions::new();

let optimized_plan = rule
.try_optimize(plan, &config)
.unwrap()
.expect("failed to optimize plan");
format!("{optimized_plan:?}")
}

fn now_expr() -> Expr {
call_fn("now", vec![]).unwrap()
}

fn cast_to_int64_expr(expr: Expr) -> Expr {
Expr::Cast(Cast::new(expr.into(), DataType::Int64))
}

fn to_timestamp_expr(arg: impl Into<String>) -> Expr {
to_timestamp(vec![lit(arg.into())])
}

#[test]
fn basic() {
let info: MyInfo = schema().into();
Expand Down Expand Up @@ -108,3 +152,73 @@ fn fold_and_simplify() {
let simplified = simplifier.simplify(expr).unwrap();
assert_eq!(simplified, lit(true))
}

#[test]
fn to_timestamp_expr_folded() -> Result<()> {
let table_scan = test_table_scan();
let proj = vec![to_timestamp_expr("2020-09-08T12:00:00+00:00")];

let plan = LogicalPlanBuilder::from(table_scan)
.project(proj)?
.build()?;

let expected = "Projection: TimestampNanosecond(1599566400000000000, None) AS to_timestamp(Utf8(\"2020-09-08T12:00:00+00:00\"))\
\n TableScan: test"
.to_string();
let actual = get_optimized_plan_formatted(&plan, &Utc::now());
assert_eq!(expected, actual);
Ok(())
}

#[test]
fn now_less_than_timestamp() -> Result<()> {
let table_scan = test_table_scan();

let ts_string = "2020-09-08T12:05:00+00:00";
let time = Utc.timestamp_nanos(1599566400000000000i64);

// cast(now() as int) < cast(to_timestamp(...) as int) + 50000_i64
let plan = LogicalPlanBuilder::from(table_scan)
.filter(
cast_to_int64_expr(now_expr())
.lt(cast_to_int64_expr(to_timestamp_expr(ts_string)) + lit(50000_i64)),
)?
.build()?;

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = "Filter: Boolean(true)\
\n TableScan: test";
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
Ok(())
}

#[test]
fn select_date_plus_interval() -> Result<()> {
let table_scan = test_table_scan();

let ts_string = "2020-09-08T12:05:00+00:00";
let time = Utc.timestamp_nanos(1599566400000000000i64);

// now() < cast(to_timestamp(...) as int) + 5000000000
let schema = table_scan.schema();

let date_plus_interval_expr = to_timestamp_expr(ts_string)
.cast_to(&DataType::Date32, schema)?
+ Expr::Literal(ScalarValue::IntervalDayTime(Some(123i64 << 32)));

let plan = LogicalPlanBuilder::from(table_scan.clone())
.project(vec![date_plus_interval_expr])?
.build()?;

// Note that constant folder runs and folds the entire
// expression down to a single constant (true)
let expected = r#"Projection: Date32("18636") AS to_timestamp(Utf8("2020-09-08T12:05:00+00:00")) + IntervalDayTime("528280977408")
TableScan: test"#;
let actual = get_optimized_plan_formatted(&plan, &time);

assert_eq!(expected, actual);
Ok(())
}
4 changes: 3 additions & 1 deletion datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ datafusion-physical-expr = { workspace = true }
hex = { version = "0.4", optional = true }
itertools = { workspace = true }
log = "0.4.20"
criterion = "0.5"
rand = { workspace = true }
rstest = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }

[dev-dependencies]
criterion = "0.5"

[[bench]]
harness = false
name = "to_timestamp"
Loading

0 comments on commit 78baa96

Please sign in to comment.