From dac246763cdf84e02e3ce2bb768efcd40532a3a4 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 10 Jun 2024 20:36:54 -0700 Subject: [PATCH 1/2] feat: Timestamp subtraction --- datafusion/common/src/scalar.rs | 3 + .../core/src/physical_plan/hash_join.rs | 42 ++++++++++++-- .../core/src/physical_plan/hash_utils.rs | 35 ++++++++++- .../src/expressions/binary_distinct.rs | 58 +++++++++++++++++++ 4 files changed, 132 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 44f1d1694146d..b1759239c5885 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -904,6 +904,9 @@ impl ScalarValue { DataType::Interval(IntervalUnit::YearMonth) => { build_array_primitive!(IntervalYearMonthArray, IntervalYearMonth) } + DataType::Interval(IntervalUnit::MonthDayNano) => { + build_array_primitive!(IntervalMonthDayNanoArray, IntervalMonthDayNano) + } DataType::List(fields) if fields.data_type() == &DataType::Int8 => { build_array_list_primitive!(Int8Type, Int8, i8) } diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index 4fa92c3cb98af..f84ed7fafe503 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -22,12 +22,14 @@ use ahash::RandomState; use arrow::{ array::{ - ArrayData, ArrayRef, BooleanArray, LargeStringArray, PrimitiveArray, - TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray, - UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, UInt64Builder, + ArrayData, ArrayRef, BooleanArray, IntervalDayTimeArray, + IntervalMonthDayNanoArray, IntervalYearMonthArray, LargeStringArray, + PrimitiveArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampSecondArray, UInt32BufferBuilder, UInt32Builder, UInt64BufferBuilder, + UInt64Builder, }, compute, - datatypes::{UInt32Type, UInt64Type}, + datatypes::{IntervalUnit, UInt32Type, UInt64Type}, }; use smallvec::{smallvec, SmallVec}; use std::sync::Arc; @@ -925,6 +927,38 @@ fn equal_rows( ) } }, + DataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + equal_rows_elem!( + IntervalYearMonthArray, + l, + r, + left, + right, + null_equals_null + ) + } + IntervalUnit::DayTime => { + equal_rows_elem!( + IntervalDayTimeArray, + l, + r, + left, + right, + null_equals_null + ) + } + IntervalUnit::MonthDayNano => { + equal_rows_elem!( + IntervalMonthDayNanoArray, + l, + r, + left, + right, + null_equals_null + ) + } + }, DataType::Utf8 => { equal_rows_elem!(StringArray, l, r, left, right, null_equals_null) } diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs index 9562e900298a7..86bd0ca6a7c23 100644 --- a/datafusion/core/src/physical_plan/hash_utils.rs +++ b/datafusion/core/src/physical_plan/hash_utils.rs @@ -22,13 +22,14 @@ use ahash::{CallHasher, RandomState}; use arrow::array::{ Array, ArrayRef, BooleanArray, Date32Array, Date64Array, DecimalArray, DictionaryArray, Float32Array, Float64Array, GenericListArray, Int16Array, - Int32Array, Int64Array, Int8Array, LargeStringArray, OffsetSizeTrait, StringArray, + Int32Array, Int64Array, Int8Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, + IntervalYearMonthArray, LargeStringArray, OffsetSizeTrait, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::datatypes::{ ArrowDictionaryKeyType, ArrowNativeType, DataType, Int16Type, Int32Type, Int64Type, - Int8Type, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + Int8Type, IntervalUnit, TimeUnit, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use std::sync::Arc; @@ -469,6 +470,36 @@ pub fn create_hashes<'a>( multi_col ); } + DataType::Interval(IntervalUnit::YearMonth) => { + hash_array_primitive!( + IntervalYearMonthArray, + col, + i32, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Interval(IntervalUnit::DayTime) => { + hash_array_primitive!( + IntervalDayTimeArray, + col, + i64, + hashes_buffer, + random_state, + multi_col + ); + } + DataType::Interval(IntervalUnit::MonthDayNano) => { + hash_array_primitive!( + IntervalMonthDayNanoArray, + col, + i128, + hashes_buffer, + random_state, + multi_col + ); + } DataType::Date32 => { hash_array_primitive!( Date32Array, diff --git a/datafusion/physical-expr/src/expressions/binary_distinct.rs b/datafusion/physical-expr/src/expressions/binary_distinct.rs index a6c46a44bc39d..41db971b39e56 100644 --- a/datafusion/physical-expr/src/expressions/binary_distinct.rs +++ b/datafusion/physical-expr/src/expressions/binary_distinct.rs @@ -48,6 +48,7 @@ pub fn distinct_types_allowed( Operator::Minus => matches!( (left_type, right_type), (Timestamp(Nanosecond, _), Interval(_)) + | (Timestamp(Nanosecond, _), Timestamp(Nanosecond, _)) ), Operator::Multiply => matches!( (left_type, right_type), @@ -100,6 +101,10 @@ pub fn coerce_types_distinct( (Date64, Interval(iunit)) | (Date32, Interval(iunit)) => { Some((Timestamp(Nanosecond, None), Interval(iunit.clone()))) } + (Timestamp(_, tz), Timestamp(_, tz2)) => Some(( + Timestamp(Nanosecond, tz.clone()), + Timestamp(Nanosecond, tz2.clone()), + )), _ => None, }, Operator::Multiply => match (lhs_type, rhs_type) { @@ -166,6 +171,10 @@ pub fn evaluate_distinct_with_resolved_args( (Timestamp(Nanosecond, Some(tz)), Interval(_)) if tz == "UTC" => { Some(timestamp_add_interval(left, right, true)) } + (Timestamp(Nanosecond, None), Timestamp(Nanosecond, None)) => { + // TODO: Implement postgres behavior with time zones + Some(timestamp_subtract_timestamp(left, right)) + } _ => None, }, Operator::Multiply => match (left_data_type, right_data_type) { @@ -334,6 +343,27 @@ fn timestamp_add_interval( } } +fn timestamp_subtract_timestamp( + left: Arc, + right: Arc, +) -> Result { + let left = left + .as_any() + .downcast_ref::() + .unwrap(); + let right = right + .as_any() + .downcast_ref::() + .unwrap(); + + let result = left + .iter() + .zip(right.iter()) + .map(|(t_l, t_r)| scalar_timestamp_subtract_timestamp(t_l, t_r)) + .collect::>()?; + Ok(Arc::new(result)) +} + fn scalar_timestamp_add_interval_year_month( timestamp: Option, interval: Option, @@ -445,6 +475,34 @@ fn scalar_timestamp_add_interval_month_day_nano( Ok(Some(result.timestamp_nanos())) } +fn scalar_timestamp_subtract_timestamp( + timestamp_left: Option, + timestamp_right: Option, +) -> Result> { + if timestamp_left.is_none() || timestamp_right.is_none() { + return Ok(None); + } + + let datetime_left: NaiveDateTime = timestamp_ns_to_datetime(timestamp_left.unwrap()); + let datetime_right: NaiveDateTime = + timestamp_ns_to_datetime(timestamp_right.unwrap()); + let duration = datetime_left.signed_duration_since(datetime_right); + // TODO: What is Postgres behavior? E.g. if these timestamp values are i64::MAX and i64::MIN, + // we needlessly have a range error. + let nanos: i64 = duration.num_nanoseconds().ok_or_else(|| { + DataFusionError::Execution("Interval value is out of range".to_string()) + })?; + + let days = nanos / 86_400_000_000_000; + let nanos_rem = nanos % 86_400_000_000_000; + Ok(Some( + (((days as i128) & 0xFFFF_FFFF) << 64) + | ((nanos_rem as i128) & 0xFFFF_FFFF_FFFF_FFFF), + )) + + // TODO: How can day, above, in scalar_timestamp_add_interval_month_day_nano, be negative? +} + fn change_ym(t: NaiveDateTime, y: i32, m: u32) -> Result { // TODO: legacy code, check validity debug_assert!((1..=12).contains(&m)); From e61a28ad884aa7674150a410576437dd313808f6 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Tue, 11 Jun 2024 23:44:26 -0700 Subject: [PATCH 2/2] feat: Epoch extraction from intervals --- datafusion/cube_ext/src/temporal.rs | 145 ++++++++++++++---- .../physical-expr/src/datetime_expressions.rs | 121 +++++++++++++-- 2 files changed, 219 insertions(+), 47 deletions(-) diff --git a/datafusion/cube_ext/src/temporal.rs b/datafusion/cube_ext/src/temporal.rs index d8a80d902618a..8ca093b4da536 100644 --- a/datafusion/cube_ext/src/temporal.rs +++ b/datafusion/cube_ext/src/temporal.rs @@ -17,7 +17,12 @@ use arrow::array::{Array, Float64Array, Int32Array, Int32Builder, PrimitiveArray}; use arrow::compute::kernels::arity::unary; -use arrow::datatypes::{ArrowNumericType, ArrowTemporalType, DataType, TimeUnit}; +use arrow::datatypes::{ + ArrowNumericType, ArrowPrimitiveType, ArrowTemporalType, DataType, Date32Type, + Date64Type, Float64Type, IntervalDayTimeType, IntervalMonthDayNanoType, + IntervalYearMonthType, TimestampMicrosecondType, TimestampMillisecondType, + TimestampNanosecondType, TimestampSecondType, +}; use arrow::error::{ArrowError, Result}; use chrono::format::strftime::StrftimeItems; @@ -145,39 +150,117 @@ where Ok(b.finish()) } +fn postgres_months_epoch(n: i32) -> f64 { + let years = n / 12; + let remainder = n % 12; + // Note that this arithmetic produces exact integer calculations with no floating point error. + let seconds_in_a_day = 86400_f64; + (years as f64) * (seconds_in_a_day * 365.25) + + (remainder as f64) * (seconds_in_a_day * 30.0) +} + +pub trait Epochable: ArrowPrimitiveType + Sized { + fn get_epoch(array: &PrimitiveArray) -> PrimitiveArray; +} + +impl Epochable for TimestampSecondType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| n as f64) + } +} + +impl Epochable for TimestampMillisecondType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| n as f64 / 1_000.0) + } +} + +impl Epochable for TimestampMicrosecondType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| n as f64 / 1_000_000.0) + } +} + +impl Epochable for TimestampNanosecondType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| n as f64 / 1_000_000_000.0) + } +} + +impl Epochable for Date32Type { + fn get_epoch(array: &PrimitiveArray) -> PrimitiveArray { + unary(array, |n| { + let seconds_in_a_day = 86400.0; + n as f64 * seconds_in_a_day + }) + } +} + +impl Epochable for Date64Type { + fn get_epoch(array: &PrimitiveArray) -> PrimitiveArray { + unary(array, |n| n as f64 / 1_000.0) + } +} + +impl Epochable for IntervalYearMonthType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, postgres_months_epoch) + } +} + +impl Epochable for IntervalDayTimeType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| { + // Implemented based on scalar_timestamp_add_interval_day_time + let sign = n.signum(); + let n = n.abs(); + // i64::MIN would work okay in release mode after the bitmask + // in the days variable (but TODO: what is Postgres' exact behavior?) + + let days: i64 = (n >> 32) & 0xFFFF_FFFF; + let millis: i64 = n & 0xFFFF_FFFF; + + let seconds_in_a_day = 86400.0; + sign as f64 * ((days as f64) * seconds_in_a_day + (millis as f64) / 1_000.0) + }) + } +} + +impl Epochable for IntervalMonthDayNanoType { + fn get_epoch( + array: &PrimitiveArray, + ) -> PrimitiveArray { + unary(array, |n| { + let seconds_in_a_day = 86400_f64; + let n: i128 = n; + let month = (n >> 96) & 0xFFFF_FFFF; + let day = (n >> 64) & 0xFFFF_FFFF; + let nano = n & 0xFFFF_FFFF_FFFF_FFFF; + let month_epoch: f64 = postgres_months_epoch(month as i32); + month_epoch + + (day as f64) * seconds_in_a_day + + (nano as f64) / 1_000_000_000.0 + }) + } +} + pub fn epoch(array: &PrimitiveArray) -> Result where - T: ArrowTemporalType + ArrowNumericType, - i64: From, + T: Epochable, { - let b = match array.data_type() { - DataType::Timestamp(tu, _) => { - let scale = match tu { - TimeUnit::Second => 1, - TimeUnit::Millisecond => 1_000, - TimeUnit::Microsecond => 1_000_000, - TimeUnit::Nanosecond => 1_000_000_000, - } as f64; - unary(array, |n| { - let n: i64 = n.into(); - n as f64 / scale - }) - } - DataType::Date32 => { - let seconds_in_a_day = 86400_f64; - unary(array, |n| { - let n: i64 = n.into(); - n as f64 * seconds_in_a_day - }) - } - DataType::Date64 => unary(array, |n| { - let n: i64 = n.into(); - n as f64 / 1_000_f64 - }), - _ => { - return_compute_error_with!("Can not convert {:?} to epoch", array.data_type()) - } - }; + let b = Epochable::get_epoch(array); Ok(b) } diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 1e8b677bdac6c..3857aff8eac45 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -17,7 +17,9 @@ //! DateTime expressions -use arrow::array::{Int64Array, IntervalDayTimeArray, IntervalYearMonthArray}; +use arrow::array::{ + Int64Array, IntervalDayTimeArray, IntervalMonthDayNanoArray, IntervalYearMonthArray, +}; use arrow::compute::cast; use arrow::{ array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}, @@ -33,7 +35,7 @@ use arrow::{ TimestampNanosecondArray, TimestampSecondArray, }, compute::kernels::temporal, - datatypes::TimeUnit, + datatypes::{IntervalUnit, TimeUnit}, temporal_conversions::timestamp_ns_to_datetime, }; use chrono::prelude::*; @@ -482,7 +484,7 @@ pub fn date_trunc(args: &[ColumnarValue]) -> Result { } macro_rules! extract_date_part { - ($ARRAY: expr, $FN:expr, $RT: expr) => { + ($ARRAY: expr, $PART_STRING: expr, $FN: expr, $RT: expr) => { match $ARRAY.data_type() { DataType::Date32 => { let array = $ARRAY.as_any().downcast_ref::().unwrap(); @@ -523,8 +525,80 @@ macro_rules! extract_date_part { } }, datatype => Err(DataFusionError::Internal(format!( - "Extract does not support datatype {:?}", - datatype + "Extract with date part '{}' does not support datatype {:?}", + $PART_STRING, datatype + ))), + } + }; +} + +macro_rules! extract_date_part_from_date_or_interval { + ($ARRAY: expr, $PART_STRING: expr, $FN: expr, $RT: expr) => { + match $ARRAY.data_type() { + DataType::Date32 => { + let array = $ARRAY.as_any().downcast_ref::().unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + DataType::Date64 => { + let array = $ARRAY.as_any().downcast_ref::().unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + DataType::Timestamp(time_unit, None) => match time_unit { + TimeUnit::Second => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + TimeUnit::Millisecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + TimeUnit::Microsecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + TimeUnit::Nanosecond => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + }, + DataType::Interval(interval_unit) => match interval_unit { + IntervalUnit::YearMonth => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + IntervalUnit::DayTime => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + IntervalUnit::MonthDayNano => { + let array = $ARRAY + .as_any() + .downcast_ref::() + .unwrap(); + Ok($FN(array).map(|v| cast(&(Arc::new(v) as ArrayRef), &$RT))?) + } + }, + datatype => Err(DataFusionError::Internal(format!( + "Extract with date part '{}' does not support datatype {:?}", + $PART_STRING, datatype ))), } }; @@ -555,18 +629,33 @@ pub fn date_part(args: &[ColumnarValue]) -> Result { }; let arr = match date_part.to_lowercase().as_str() { - "doy" => extract_date_part!(array, cube_ext::temporal::doy, DataType::Int32), - "dow" => extract_date_part!(array, cube_ext::temporal::dow, DataType::Int32), - "year" => extract_date_part!(array, temporal::year, DataType::Int32), - "quarter" => extract_date_part!(array, temporal::quarter, DataType::Int32), - "month" => extract_date_part!(array, temporal::month, DataType::Int32), - "week" => extract_date_part!(array, temporal::week, DataType::Int32), - "day" => extract_date_part!(array, temporal::day, DataType::Int32), - "hour" => extract_date_part!(array, temporal::hour, DataType::Int32), - "minute" => extract_date_part!(array, temporal::minute, DataType::Int32), - "second" => extract_date_part!(array, temporal::second, DataType::Int32), + "doy" => { + extract_date_part!(array, date_part, cube_ext::temporal::doy, DataType::Int32) + } + "dow" => { + extract_date_part!(array, date_part, cube_ext::temporal::dow, DataType::Int32) + } + "year" => extract_date_part!(array, date_part, temporal::year, DataType::Int32), + "quarter" => { + extract_date_part!(array, date_part, temporal::quarter, DataType::Int32) + } + "month" => extract_date_part!(array, date_part, temporal::month, DataType::Int32), + "week" => extract_date_part!(array, date_part, temporal::week, DataType::Int32), + "day" => extract_date_part!(array, date_part, temporal::day, DataType::Int32), + "hour" => extract_date_part!(array, date_part, temporal::hour, DataType::Int32), + "minute" => { + extract_date_part!(array, date_part, temporal::minute, DataType::Int32) + } + "second" => { + extract_date_part!(array, date_part, temporal::second, DataType::Int32) + } "epoch" => { - extract_date_part!(array, cube_ext::temporal::epoch, DataType::Float64) + extract_date_part_from_date_or_interval!( + array, + date_part, + cube_ext::temporal::epoch, + DataType::Float64 + ) } _ => Err(DataFusionError::Execution(format!( "Date part '{}' not supported",