From e00fa65d92fc9e651360e4a03732b000ef2f67b7 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 28 Mar 2024 21:56:22 +0100 Subject: [PATCH 1/9] parse missing scalarValues --- crates/core/src/delta_datafusion/expr.rs | 41 ++++++++++- python/tests/test_merge.py | 86 ++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index dfe234ad46..5067d0a852 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -22,11 +22,12 @@ //! Utility functions for Datafusion's Expressions use std::{ - fmt::{self, Display, Formatter, Write}, + fmt::{self, format, Display, Formatter, Write}, sync::Arc, }; use arrow_schema::DataType; +use chrono::{Date, NaiveDate, NaiveDateTime, TimeZone}; use datafusion::execution::context::SessionState; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; @@ -321,6 +322,9 @@ macro_rules! format_option { }}; } +/// Epoch days from ce calander until 1970-01-01 +pub const EPOCH_DAYS_FROM_CE: i32 = 719_163; + struct ScalarValueFormat<'a> { scalar: &'a ScalarValue, } @@ -339,6 +343,41 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { ScalarValue::UInt16(e) => format_option!(f, e)?, ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, + ScalarValue::Date32(e) => match e { + Some(e) => { + let dt = + NaiveDate::from_num_days_from_ce_opt((EPOCH_DAYS_FROM_CE + (*e)).into()) + .unwrap(); + write!(f, "{}", dt)? + } + None => write!(f, "NULL")?, + }, + ScalarValue::Date64(e) => match e { + Some(e) => write!( + f, + "{}", + NaiveDateTime::from_timestamp_millis((*e).into()) + .unwrap() + .date() + )?, + None => write!(f, "NULL")?, + }, + ScalarValue::TimestampMicrosecond(e, tz) => match e { + Some(e) => match tz { + Some(tz) => write!( + f, + "{}", + NaiveDateTime::from_timestamp_micros(*e).unwrap().and_utc() + )?, + None => write!( + f, + "{}", + NaiveDateTime::from_timestamp_micros(*e) + .unwrap() + )?, + }, + None => write!(f, "NULL")?, + }, ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) => match e { Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?, None => write!(f, "NULL")?, diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 82776c60fc..3afd1b6ae4 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -1,6 +1,7 @@ import pathlib import pyarrow as pa +import pytest from deltalake import DeltaTable, write_deltalake @@ -763,3 +764,88 @@ def test_merge_multiple_when_not_matched_by_source_update_wo_predicate( assert last_action["operation"] == "MERGE" assert result == expected + + +def test_merge_date_partitioned_2344(tmp_path: pathlib.Path): + from datetime import date + + schema = pa.schema( + [ + ("date", pa.date32()), + ("foo", pa.string()), + ("bar", pa.string()), + ] + ) + + dt = DeltaTable.create( + tmp_path, schema=schema, partition_by=["date"], mode="overwrite" + ) + + data = pa.table( + { + "date": pa.array([date(2022, 2, 1)]), + "foo": pa.array(["hello"]), + "bar": pa.array(["world"]), + } + ) + + dt.merge( + data, + predicate="s.date = t.date", + source_alias="s", + target_alias="t", + ).when_matched_update_all().when_not_matched_insert_all().execute() + + result = dt.to_pyarrow_table() + last_action = dt.history(1)[0] + + assert last_action["operation"] == "MERGE" + assert result == data + assert last_action["operationParameters"].get("predicate") == "2022-02-01 = date" + + +@pytest.mark.parametrize( + "timezone,predicate", + [ + (None, "2022-02-01 00:00:00 = datetime"), + ("UTC", "2022-02-01 00:00:00 UTC = datetime"), + ], +) +def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, predicate): + from datetime import datetime + + schema = pa.schema( + [ + ("datetime", pa.timestamp("us", tz=timezone)), + ("foo", pa.string()), + ("bar", pa.string()), + ] + ) + + dt = DeltaTable.create( + tmp_path, schema=schema, partition_by=["datetime"], mode="overwrite" + ) + + data = pa.table( + { + "datetime": pa.array( + [datetime(2022, 2, 1)], pa.timestamp("us", tz=timezone) + ), + "foo": pa.array(["hello"]), + "bar": pa.array(["world"]), + } + ) + + dt.merge( + data, + predicate="s.datetime = t.datetime", + source_alias="s", + target_alias="t", + ).when_matched_update_all().when_not_matched_insert_all().execute() + + result = dt.to_pyarrow_table() + last_action = dt.history(1)[0] + + assert last_action["operation"] == "MERGE" + assert result == data + assert last_action["operationParameters"].get("predicate") == predicate From 67fe436d9406083b5528fe2a9a2a6dc3001023c9 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 28 Mar 2024 21:57:29 +0100 Subject: [PATCH 2/9] fmt --- crates/core/src/delta_datafusion/expr.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 5067d0a852..e631ba8b5d 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -369,12 +369,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { "{}", NaiveDateTime::from_timestamp_micros(*e).unwrap().and_utc() )?, - None => write!( - f, - "{}", - NaiveDateTime::from_timestamp_micros(*e) - .unwrap() - )?, + None => write!(f, "{}", NaiveDateTime::from_timestamp_micros(*e).unwrap())?, }, None => write!(f, "NULL")?, }, From 570a9e30cb4530dddc7612483c5b241d29bbd13d Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 28 Mar 2024 21:56:22 +0100 Subject: [PATCH 3/9] parse missing scalarValues --- crates/core/src/delta_datafusion/expr.rs | 41 ++++++++++- python/tests/test_merge.py | 86 ++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 1 deletion(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index dfe234ad46..5067d0a852 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -22,11 +22,12 @@ //! Utility functions for Datafusion's Expressions use std::{ - fmt::{self, Display, Formatter, Write}, + fmt::{self, format, Display, Formatter, Write}, sync::Arc, }; use arrow_schema::DataType; +use chrono::{Date, NaiveDate, NaiveDateTime, TimeZone}; use datafusion::execution::context::SessionState; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; @@ -321,6 +322,9 @@ macro_rules! format_option { }}; } +/// Epoch days from ce calander until 1970-01-01 +pub const EPOCH_DAYS_FROM_CE: i32 = 719_163; + struct ScalarValueFormat<'a> { scalar: &'a ScalarValue, } @@ -339,6 +343,41 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { ScalarValue::UInt16(e) => format_option!(f, e)?, ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, + ScalarValue::Date32(e) => match e { + Some(e) => { + let dt = + NaiveDate::from_num_days_from_ce_opt((EPOCH_DAYS_FROM_CE + (*e)).into()) + .unwrap(); + write!(f, "{}", dt)? + } + None => write!(f, "NULL")?, + }, + ScalarValue::Date64(e) => match e { + Some(e) => write!( + f, + "{}", + NaiveDateTime::from_timestamp_millis((*e).into()) + .unwrap() + .date() + )?, + None => write!(f, "NULL")?, + }, + ScalarValue::TimestampMicrosecond(e, tz) => match e { + Some(e) => match tz { + Some(tz) => write!( + f, + "{}", + NaiveDateTime::from_timestamp_micros(*e).unwrap().and_utc() + )?, + None => write!( + f, + "{}", + NaiveDateTime::from_timestamp_micros(*e) + .unwrap() + )?, + }, + None => write!(f, "NULL")?, + }, ScalarValue::Utf8(e) | ScalarValue::LargeUtf8(e) => match e { Some(e) => write!(f, "'{}'", escape_quoted_string(e, '\''))?, None => write!(f, "NULL")?, diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 82776c60fc..3afd1b6ae4 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -1,6 +1,7 @@ import pathlib import pyarrow as pa +import pytest from deltalake import DeltaTable, write_deltalake @@ -763,3 +764,88 @@ def test_merge_multiple_when_not_matched_by_source_update_wo_predicate( assert last_action["operation"] == "MERGE" assert result == expected + + +def test_merge_date_partitioned_2344(tmp_path: pathlib.Path): + from datetime import date + + schema = pa.schema( + [ + ("date", pa.date32()), + ("foo", pa.string()), + ("bar", pa.string()), + ] + ) + + dt = DeltaTable.create( + tmp_path, schema=schema, partition_by=["date"], mode="overwrite" + ) + + data = pa.table( + { + "date": pa.array([date(2022, 2, 1)]), + "foo": pa.array(["hello"]), + "bar": pa.array(["world"]), + } + ) + + dt.merge( + data, + predicate="s.date = t.date", + source_alias="s", + target_alias="t", + ).when_matched_update_all().when_not_matched_insert_all().execute() + + result = dt.to_pyarrow_table() + last_action = dt.history(1)[0] + + assert last_action["operation"] == "MERGE" + assert result == data + assert last_action["operationParameters"].get("predicate") == "2022-02-01 = date" + + +@pytest.mark.parametrize( + "timezone,predicate", + [ + (None, "2022-02-01 00:00:00 = datetime"), + ("UTC", "2022-02-01 00:00:00 UTC = datetime"), + ], +) +def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, predicate): + from datetime import datetime + + schema = pa.schema( + [ + ("datetime", pa.timestamp("us", tz=timezone)), + ("foo", pa.string()), + ("bar", pa.string()), + ] + ) + + dt = DeltaTable.create( + tmp_path, schema=schema, partition_by=["datetime"], mode="overwrite" + ) + + data = pa.table( + { + "datetime": pa.array( + [datetime(2022, 2, 1)], pa.timestamp("us", tz=timezone) + ), + "foo": pa.array(["hello"]), + "bar": pa.array(["world"]), + } + ) + + dt.merge( + data, + predicate="s.datetime = t.datetime", + source_alias="s", + target_alias="t", + ).when_matched_update_all().when_not_matched_insert_all().execute() + + result = dt.to_pyarrow_table() + last_action = dt.history(1)[0] + + assert last_action["operation"] == "MERGE" + assert result == data + assert last_action["operationParameters"].get("predicate") == predicate From c2066474c28fb5e200e5619f3f7a5f5757d0a316 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 28 Mar 2024 21:57:29 +0100 Subject: [PATCH 4/9] fmt --- crates/core/src/delta_datafusion/expr.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 5067d0a852..e631ba8b5d 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -369,12 +369,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { "{}", NaiveDateTime::from_timestamp_micros(*e).unwrap().and_utc() )?, - None => write!( - f, - "{}", - NaiveDateTime::from_timestamp_micros(*e) - .unwrap() - )?, + None => write!(f, "{}", NaiveDateTime::from_timestamp_micros(*e).unwrap())?, }, None => write!(f, "NULL")?, }, From 17447b8053227d5d4d8a62cc86d1b6ebea24bf6f Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Fri, 29 Mar 2024 15:47:52 +0100 Subject: [PATCH 5/9] convert Option to Result --- crates/core/src/delta_datafusion/expr.rs | 26 +++++++++++++++--------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index e631ba8b5d..5691a1952d 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -22,7 +22,7 @@ //! Utility functions for Datafusion's Expressions use std::{ - fmt::{self, format, Display, Formatter, Write}, + fmt::{self, format, Display, Error, Formatter, Write}, sync::Arc, }; @@ -344,12 +344,12 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { ScalarValue::UInt32(e) => format_option!(f, e)?, ScalarValue::UInt64(e) => format_option!(f, e)?, ScalarValue::Date32(e) => match e { - Some(e) => { - let dt = - NaiveDate::from_num_days_from_ce_opt((EPOCH_DAYS_FROM_CE + (*e)).into()) - .unwrap(); - write!(f, "{}", dt)? - } + Some(e) => write!( + f, + "{}", + NaiveDate::from_num_days_from_ce_opt((EPOCH_DAYS_FROM_CE + (*e)).into()) + .ok_or(Error::default())? + )?, None => write!(f, "NULL")?, }, ScalarValue::Date64(e) => match e { @@ -357,7 +357,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { f, "{}", NaiveDateTime::from_timestamp_millis((*e).into()) - .unwrap() + .ok_or(Error::default())? .date() )?, None => write!(f, "NULL")?, @@ -367,9 +367,15 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { Some(tz) => write!( f, "{}", - NaiveDateTime::from_timestamp_micros(*e).unwrap().and_utc() + NaiveDateTime::from_timestamp_micros(*e) + .ok_or(Error::default())? + .and_utc() + )?, + None => write!( + f, + "{}", + NaiveDateTime::from_timestamp_micros(*e).ok_or(Error::default())? )?, - None => write!(f, "{}", NaiveDateTime::from_timestamp_micros(*e).unwrap())?, }, None => write!(f, "NULL")?, }, From f29b280cc439968b9c36ad7d4c91ee2e12814305 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Apr 2024 19:19:19 +0200 Subject: [PATCH 6/9] . --- crates/core/src/delta_datafusion/expr.rs | 25 ++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 883f5e0878..0614fd1770 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -363,7 +363,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { "{}", NaiveDateTime::from_timestamp_millis((*e).into()) .ok_or(Error::default())? - .date() + .date().format("%Y-%m-%d") )?, None => write!(f, "NULL")?, }, @@ -371,15 +371,16 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { Some(e) => match tz { Some(tz) => write!( f, - "{}", + "'{}'", NaiveDateTime::from_timestamp_micros(*e) .ok_or(Error::default())? .and_utc() + .format("%Y-%m-%dT%H:%M:%S%.6f") )?, None => write!( f, - "{}", - NaiveDateTime::from_timestamp_micros(*e).ok_or(Error::default())? + "'{}'", + NaiveDateTime::from_timestamp_micros(*e).ok_or(Error::default())?.format("%Y-%m-%dT%H:%M:%S%.6f") )?, }, None => write!(f, "NULL")?, @@ -485,6 +486,11 @@ mod test { DataType::Primitive(PrimitiveType::Timestamp), true, ), + StructField::new( + "_timestamp_ntz".to_string(), + DataType::Primitive(PrimitiveType::TimestampNtz), + true, + ), StructField::new( "_binary".to_string(), DataType::Primitive(PrimitiveType::Binary), @@ -650,6 +656,17 @@ mod test { cardinality(col("_list").range(col("value"), lit(10_i64))), "cardinality(_list[value:10:1])".to_string() ), + simple!( + col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), + "_timestamp_ntz > '2010-01-01T00:00:00.000000'".to_string() + ), + simple!( + col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( + Some(1262304000000000), + Some("UTC".into()) + ))), + "_timestamp > '2010-01-01T00:00:00.000000 UTC'".to_string() + ), ]; let session: SessionContext = DeltaSessionContext::default().into(); From 79bb3891f636c02ff257f06a203bbff5bbe42b89 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Apr 2024 20:18:42 +0200 Subject: [PATCH 7/9] cast --- crates/core/src/delta_datafusion/expr.rs | 35 ++++++++++++++++-------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 0614fd1770..80f87393b8 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -360,7 +360,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { ScalarValue::Date64(e) => match e { Some(e) => write!( f, - "{}", + "'{}'::date", NaiveDateTime::from_timestamp_millis((*e).into()) .ok_or(Error::default())? .date().format("%Y-%m-%d") @@ -371,7 +371,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { Some(e) => match tz { Some(tz) => write!( f, - "'{}'", + "arrow_cast('{}', 'Timestamp(Microsecond, UTC)')", NaiveDateTime::from_timestamp_micros(*e) .ok_or(Error::default())? .and_utc() @@ -379,7 +379,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { )?, None => write!( f, - "'{}'", + "arrow_cast('{}', 'Timestamp(Microsecond, None)')", NaiveDateTime::from_timestamp_micros(*e).ok_or(Error::default())?.format("%Y-%m-%dT%H:%M:%S%.6f") )?, }, @@ -411,6 +411,7 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { #[cfg(test)] mod test { + use arrow_cast::cast; use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; @@ -656,17 +657,29 @@ mod test { cardinality(col("_list").range(col("value"), lit(10_i64))), "cardinality(_list[value:10:1])".to_string() ), - simple!( - col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), - "_timestamp_ntz > '2010-01-01T00:00:00.000000'".to_string() - ), - simple!( - col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( + ParseTest { + expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), + expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(), + override_expected_expr: Some(col("_timestamp_ntz").gt( + datafusion_expr::Expr::Cast( Cast { + expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), + data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) + } + ))), + }, + ParseTest { + expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( Some(1262304000000000), Some("UTC".into()) ))), - "_timestamp > '2010-01-01T00:00:00.000000 UTC'".to_string() - ), + expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, UTC)')".to_string(), + override_expected_expr: Some(col("_timestamp_ntz").gt( + datafusion_expr::Expr::Cast( Cast { + expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), + data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into())) + } + ))), + }, ]; let session: SessionContext = DeltaSessionContext::default().into(); From cf201050d777a27c3bd32a50b10468d44271c67a Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Apr 2024 20:56:34 +0200 Subject: [PATCH 8/9] use to_timestamp_micros --- crates/core/src/delta_datafusion/expr.rs | 48 +++++++++++++----------- python/tests/test_merge.py | 4 +- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 80f87393b8..4fb6b1b17f 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -363,7 +363,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { "'{}'::date", NaiveDateTime::from_timestamp_millis((*e).into()) .ok_or(Error::default())? - .date().format("%Y-%m-%d") + .date() + .format("%Y-%m-%d") )?, None => write!(f, "NULL")?, }, @@ -371,16 +372,18 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { Some(e) => match tz { Some(tz) => write!( f, - "arrow_cast('{}', 'Timestamp(Microsecond, UTC)')", + "to_timestamp_micros('{}')", NaiveDateTime::from_timestamp_micros(*e) .ok_or(Error::default())? .and_utc() - .format("%Y-%m-%dT%H:%M:%S%.6f") + .format("%Y-%m-%dT%H:%M:%S%.6f%:z") )?, None => write!( f, - "arrow_cast('{}', 'Timestamp(Microsecond, None)')", - NaiveDateTime::from_timestamp_micros(*e).ok_or(Error::default())?.format("%Y-%m-%dT%H:%M:%S%.6f") + "to_timestamp_micros('{}')", + NaiveDateTime::from_timestamp_micros(*e) + .ok_or(Error::default())? + .format("%Y-%m-%dT%H:%M:%S%.6f") )?, }, None => write!(f, "NULL")?, @@ -415,7 +418,9 @@ mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_expr::{ + cardinality, col, lit, substring, to_timestamp_micros, Cast, Expr, ExprSchemable, + }; use datafusion_functions::encoding::expr_fn::decode; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; @@ -658,27 +663,26 @@ mod test { "cardinality(_list[value:10:1])".to_string() ), ParseTest { - expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), - expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(), - override_expected_expr: Some(col("_timestamp_ntz").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) - } - ))), + expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond( + Some(1262304000000000), + None, + ))), + expected: "_timestamp_ntz > to_timestamp_micros('2010-01-01T00:00:00.000000')" + .to_string(), + override_expected_expr: Some(col("_timestamp_ntz").gt(to_timestamp_micros(vec![ + lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), + ]))), }, ParseTest { expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( Some(1262304000000000), - Some("UTC".into()) + Some("UTC".into()), ))), - expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, UTC)')".to_string(), - override_expected_expr: Some(col("_timestamp_ntz").gt( - datafusion_expr::Expr::Cast( Cast { - expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), - data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into())) - } - ))), + expected: "_timestamp > to_timestamp_micros('2010-01-01T00:00:00.000000+00:00')" + .to_string(), + override_expected_expr: Some(col("_timestamp").gt(to_timestamp_micros(vec![lit( + ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000+00:00".into())), + )]))), }, ]; diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index 3afd1b6ae4..fb828bd5f2 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -807,8 +807,8 @@ def test_merge_date_partitioned_2344(tmp_path: pathlib.Path): @pytest.mark.parametrize( "timezone,predicate", [ - (None, "2022-02-01 00:00:00 = datetime"), - ("UTC", "2022-02-01 00:00:00 UTC = datetime"), + (None, "to_timestamp_micros('2022-02-01T00:00:00.000000') = datetime"), + ("UTC", "to_timestamp_micros('2022-02-01T00:00:00.000000+00:00') = datetime"), ], ) def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, predicate): From 286163111c11298b3e71627ad45edab6db24abcd Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 1 Apr 2024 21:58:56 +0200 Subject: [PATCH 9/9] fix cast --- crates/core/src/delta_datafusion/expr.rs | 41 +++++++++++------------- python/tests/test_merge.py | 10 ++++-- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 4fb6b1b17f..4e6dc02ac4 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -372,15 +372,15 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { Some(e) => match tz { Some(tz) => write!( f, - "to_timestamp_micros('{}')", + "arrow_cast('{}', 'Timestamp(Microsecond, Some(\"UTC\"))')", NaiveDateTime::from_timestamp_micros(*e) .ok_or(Error::default())? .and_utc() - .format("%Y-%m-%dT%H:%M:%S%.6f%:z") + .format("%Y-%m-%dT%H:%M:%S%.6f") )?, None => write!( f, - "to_timestamp_micros('{}')", + "arrow_cast('{}', 'Timestamp(Microsecond, None)')", NaiveDateTime::from_timestamp_micros(*e) .ok_or(Error::default())? .format("%Y-%m-%dT%H:%M:%S%.6f") @@ -414,13 +414,10 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { #[cfg(test)] mod test { - use arrow_cast::cast; use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; - use datafusion_expr::{ - cardinality, col, lit, substring, to_timestamp_micros, Cast, Expr, ExprSchemable, - }; + use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; use datafusion_functions::encoding::expr_fn::decode; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; @@ -663,26 +660,26 @@ mod test { "cardinality(_list[value:10:1])".to_string() ), ParseTest { - expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond( - Some(1262304000000000), - None, - ))), - expected: "_timestamp_ntz > to_timestamp_micros('2010-01-01T00:00:00.000000')" - .to_string(), - override_expected_expr: Some(col("_timestamp_ntz").gt(to_timestamp_micros(vec![ - lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))), - ]))), + expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))), + expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(), + override_expected_expr: Some(col("_timestamp_ntz").gt( + datafusion_expr::Expr::Cast( Cast { + expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), + data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None) + } + ))), }, ParseTest { expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond( Some(1262304000000000), - Some("UTC".into()), + Some("UTC".into()) ))), - expected: "_timestamp > to_timestamp_micros('2010-01-01T00:00:00.000000+00:00')" - .to_string(), - override_expected_expr: Some(col("_timestamp").gt(to_timestamp_micros(vec![lit( - ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000+00:00".into())), - )]))), + expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(), + override_expected_expr: Some(col("_timestamp").gt( + datafusion_expr::Expr::Cast( Cast { + expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))), + data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into())) + }))), }, ]; diff --git a/python/tests/test_merge.py b/python/tests/test_merge.py index fb828bd5f2..9628cad5f3 100644 --- a/python/tests/test_merge.py +++ b/python/tests/test_merge.py @@ -807,8 +807,14 @@ def test_merge_date_partitioned_2344(tmp_path: pathlib.Path): @pytest.mark.parametrize( "timezone,predicate", [ - (None, "to_timestamp_micros('2022-02-01T00:00:00.000000') = datetime"), - ("UTC", "to_timestamp_micros('2022-02-01T00:00:00.000000+00:00') = datetime"), + ( + None, + "arrow_cast('2022-02-01T00:00:00.000000', 'Timestamp(Microsecond, None)') = datetime", + ), + ( + "UTC", + "arrow_cast('2022-02-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))') = datetime", + ), ], ) def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, predicate):