diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 4e65a9fdd539..30ba227c3ae3 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -21,6 +21,8 @@ mod kernels_arrow; use std::{any::Any, sync::Arc}; +use chrono::NaiveDateTime; + use arrow::array::*; use arrow::compute::kernels::arithmetic::{ add_dyn, add_scalar_dyn as add_dyn_scalar, divide_dyn_opt, @@ -78,6 +80,12 @@ use crate::intervals::{apply_operator, Interval}; use crate::physical_expr::down_cast_any_ref; use crate::{analysis_expect, AnalysisContext, ExprBoundaries, PhysicalExpr}; use datafusion_common::cast::as_boolean_array; +use datafusion_common::cast::{ + as_interval_dt_array, as_interval_mdn_array, as_interval_ym_array, + as_timestamp_microsecond_array, as_timestamp_millisecond_array, + as_timestamp_nanosecond_array, as_timestamp_second_array, +}; +use datafusion_common::scalar::*; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use datafusion_expr::type_coercion::binary::binary_operator_data_type; @@ -1229,6 +1237,458 @@ pub fn binary( Ok(Arc::new(BinaryExpr::new(lhs, op, rhs))) } +// Macros related with timestamp & interval operations +macro_rules! ts_sub_op { + ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{ + let prim_array_lhs = $caster(&$lhs)?; + let prim_array_rhs = $caster(&$rhs)?; + let ret: PrimitiveArray<$type_out> = + arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| { + let (parsed_lhs_tz, parsed_rhs_tz) = + (parse_timezones($lhs_tz)?, parse_timezones($rhs_tz)?); + let (naive_lhs, naive_rhs) = calculate_naives::<$mode>( + ts1.mul_wrapping($coef), + parsed_lhs_tz, + ts2.mul_wrapping($coef), + parsed_rhs_tz, + )?; + Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs))) + })?; + Arc::new(ret) as ArrayRef + }}; +} +macro_rules! interval_op { + ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{ + let prim_array_lhs = $caster(&$lhs)?; + let prim_array_rhs = $caster(&$rhs)?; + let ret = Arc::new(arrow::compute::binary::<$type_in, $type_in, _, $type_in>( + prim_array_lhs, + prim_array_rhs, + |interval1, interval2| $op(interval1, interval2, $sign), + )?) as ArrayRef; + ret + }}; +} +macro_rules! interval_cross_op { + ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{ + let prim_array_lhs = $caster1(&$lhs)?; + let prim_array_rhs = $caster2(&$rhs)?; + let ret = Arc::new(arrow::compute::binary::< + $type_in1, + $type_in2, + _, + IntervalMonthDayNanoType, + >( + prim_array_lhs, + prim_array_rhs, + |interval1, interval2| $op(interval1, interval2, $sign, $commute), + )?) as ArrayRef; + ret + }}; +} +macro_rules! ts_interval_op { + ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{ + let prim_array_lhs = $caster1(&$lhs)?; + let prim_array_rhs = $caster2(&$rhs)?; + let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary( + prim_array_lhs, + prim_array_rhs, + |ts, interval| Ok($op(ts, interval as i128, $sign)?), + )?; + Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef + }}; +} + +/// Performs a timestamp subtraction operation on two arrays and returns the resulting array. +pub fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result { + match (array_lhs.data_type(), array_rhs.data_type()) { + ( + DataType::Timestamp(TimeUnit::Second, opt_tz_lhs), + DataType::Timestamp(TimeUnit::Second, opt_tz_rhs), + ) => Ok(ts_sub_op!( + array_lhs, + array_rhs, + opt_tz_lhs, + opt_tz_rhs, + 1000i64, + as_timestamp_second_array, + seconds_sub, + NaiveDateTime::timestamp, + MILLISECOND_MODE, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs), + DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs), + ) => Ok(ts_sub_op!( + array_lhs, + array_rhs, + opt_tz_lhs, + opt_tz_rhs, + 1i64, + as_timestamp_millisecond_array, + milliseconds_sub, + NaiveDateTime::timestamp_millis, + MILLISECOND_MODE, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs), + DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs), + ) => Ok(ts_sub_op!( + array_lhs, + array_rhs, + opt_tz_lhs, + opt_tz_rhs, + 1000i64, + as_timestamp_microsecond_array, + microseconds_sub, + NaiveDateTime::timestamp_micros, + NANOSECOND_MODE, + IntervalMonthDayNanoType + )), + ( + DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs), + DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs), + ) => Ok(ts_sub_op!( + array_lhs, + array_rhs, + opt_tz_lhs, + opt_tz_rhs, + 1i64, + as_timestamp_nanosecond_array, + nanoseconds_sub, + NaiveDateTime::timestamp_nanos, + NANOSECOND_MODE, + IntervalMonthDayNanoType + )), + (_, _) => Err(DataFusionError::Execution(format!( + "Invalid array types for Timestamp subtraction: {} - {}", + array_lhs.data_type(), + array_rhs.data_type() + ))), + } +} +/// Performs an interval operation on two arrays and returns the resulting array. +/// The operation sign determines whether to perform addition or subtraction. +/// The data type and unit of the two input arrays must match the supported combinations. +pub fn interval_array_op( + array_lhs: &ArrayRef, + array_rhs: &ArrayRef, + sign: i32, +) -> Result { + match (array_lhs.data_type(), array_rhs.data_type()) { + ( + DataType::Interval(IntervalUnit::YearMonth), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(interval_op!( + array_lhs, + array_rhs, + as_interval_ym_array, + op_ym, + sign, + IntervalYearMonthType + )), + ( + DataType::Interval(IntervalUnit::YearMonth), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(interval_cross_op!( + array_lhs, + array_rhs, + as_interval_ym_array, + as_interval_dt_array, + op_ym_dt, + sign, + false, + IntervalYearMonthType, + IntervalDayTimeType + )), + ( + DataType::Interval(IntervalUnit::YearMonth), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(interval_cross_op!( + array_lhs, + array_rhs, + as_interval_ym_array, + as_interval_mdn_array, + op_ym_mdn, + sign, + false, + IntervalYearMonthType, + IntervalMonthDayNanoType + )), + ( + DataType::Interval(IntervalUnit::DayTime), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(interval_cross_op!( + array_rhs, + array_lhs, + as_interval_ym_array, + as_interval_dt_array, + op_ym_dt, + sign, + true, + IntervalYearMonthType, + IntervalDayTimeType + )), + ( + DataType::Interval(IntervalUnit::DayTime), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(interval_op!( + array_lhs, + array_rhs, + as_interval_dt_array, + op_dt, + sign, + IntervalDayTimeType + )), + ( + DataType::Interval(IntervalUnit::DayTime), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(interval_cross_op!( + array_lhs, + array_rhs, + as_interval_dt_array, + as_interval_mdn_array, + op_dt_mdn, + sign, + false, + IntervalDayTimeType, + IntervalMonthDayNanoType + )), + ( + DataType::Interval(IntervalUnit::MonthDayNano), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(interval_cross_op!( + array_rhs, + array_lhs, + as_interval_ym_array, + as_interval_mdn_array, + op_ym_mdn, + sign, + true, + IntervalYearMonthType, + IntervalMonthDayNanoType + )), + ( + DataType::Interval(IntervalUnit::MonthDayNano), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(interval_cross_op!( + array_rhs, + array_lhs, + as_interval_dt_array, + as_interval_mdn_array, + op_dt_mdn, + sign, + true, + IntervalDayTimeType, + IntervalMonthDayNanoType + )), + ( + DataType::Interval(IntervalUnit::MonthDayNano), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(interval_op!( + array_lhs, + array_rhs, + as_interval_mdn_array, + op_mdn, + sign, + IntervalMonthDayNanoType + )), + (_, _) => Err(DataFusionError::Execution(format!( + "Invalid array types for Interval operation: {} {} {}", + array_lhs.data_type(), + sign, + array_rhs.data_type() + ))), + } +} +/// Performs a timestamp/interval operation on two arrays and returns the resulting array. +/// The operation sign determines whether to perform addition or subtraction. +/// The data type and unit of the two input arrays must match the supported combinations. +pub fn ts_interval_array_op( + array_lhs: &ArrayRef, + sign: i32, + array_rhs: &ArrayRef, +) -> Result { + match (array_lhs.data_type(), array_rhs.data_type()) { + ( + DataType::Timestamp(TimeUnit::Second, tz), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_second_array, + as_interval_ym_array, + seconds_add_array::, + sign, + TimestampSecondType, + IntervalYearMonthType + )), + ( + DataType::Timestamp(TimeUnit::Second, tz), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_second_array, + as_interval_dt_array, + seconds_add_array::, + sign, + TimestampSecondType, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Second, tz), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_second_array, + as_interval_mdn_array, + seconds_add_array::, + sign, + TimestampSecondType, + IntervalMonthDayNanoType + )), + ( + DataType::Timestamp(TimeUnit::Millisecond, tz), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_millisecond_array, + as_interval_ym_array, + milliseconds_add_array::, + sign, + TimestampMillisecondType, + IntervalYearMonthType + )), + ( + DataType::Timestamp(TimeUnit::Millisecond, tz), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_millisecond_array, + as_interval_dt_array, + milliseconds_add_array::, + sign, + TimestampMillisecondType, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Millisecond, tz), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_millisecond_array, + as_interval_mdn_array, + milliseconds_add_array::, + sign, + TimestampMillisecondType, + IntervalMonthDayNanoType + )), + ( + DataType::Timestamp(TimeUnit::Microsecond, tz), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_microsecond_array, + as_interval_ym_array, + microseconds_add_array::, + sign, + TimestampMicrosecondType, + IntervalYearMonthType + )), + ( + DataType::Timestamp(TimeUnit::Microsecond, tz), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_microsecond_array, + as_interval_dt_array, + microseconds_add_array::, + sign, + TimestampMicrosecondType, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Microsecond, tz), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_microsecond_array, + as_interval_mdn_array, + microseconds_add_array::, + sign, + TimestampMicrosecondType, + IntervalMonthDayNanoType + )), + ( + DataType::Timestamp(TimeUnit::Nanosecond, tz), + DataType::Interval(IntervalUnit::YearMonth), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_nanosecond_array, + as_interval_ym_array, + nanoseconds_add_array::, + sign, + TimestampNanosecondType, + IntervalYearMonthType + )), + ( + DataType::Timestamp(TimeUnit::Nanosecond, tz), + DataType::Interval(IntervalUnit::DayTime), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_nanosecond_array, + as_interval_dt_array, + nanoseconds_add_array::, + sign, + TimestampNanosecondType, + IntervalDayTimeType + )), + ( + DataType::Timestamp(TimeUnit::Nanosecond, tz), + DataType::Interval(IntervalUnit::MonthDayNano), + ) => Ok(ts_interval_op!( + array_lhs, + array_rhs, + tz, + as_timestamp_nanosecond_array, + as_interval_mdn_array, + nanoseconds_add_array::, + sign, + TimestampNanosecondType, + IntervalMonthDayNanoType + )), + (_, _) => Err(DataFusionError::Execution(format!( + "Invalid array types for Timestamp Interval operation: {} {} {}", + array_lhs.data_type(), + sign, + array_rhs.data_type() + ))), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/expressions/datetime.rs b/datafusion/physical-expr/src/expressions/datetime.rs index 518a28268765..9246074841a4 100644 --- a/datafusion/physical-expr/src/expressions/datetime.rs +++ b/datafusion/physical-expr/src/expressions/datetime.rs @@ -17,17 +17,14 @@ use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; -use arrow::array::{Array, ArrayRef, PrimitiveArray}; -use arrow::compute::{binary, unary}; +use arrow::array::{Array, ArrayRef}; +use arrow::compute::unary; use arrow::datatypes::{ - ArrowNativeTypeOp, DataType, Date32Type, Date64Type, IntervalDayTimeType, - IntervalMonthDayNanoType, IntervalYearMonthType, Schema, TimeUnit, - TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, - TimestampSecondType, + DataType, Date32Type, Date64Type, Schema, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, }; use arrow::record_batch::RecordBatch; -use arrow_schema::IntervalUnit; -use chrono::NaiveDateTime; + use datafusion_common::cast::*; use datafusion_common::scalar::*; use datafusion_common::Result; @@ -38,6 +35,8 @@ use std::any::Any; use std::fmt::{Display, Formatter}; use std::sync::Arc; +use super::binary::{interval_array_op, ts_array_op, ts_interval_array_op}; + /// Perform DATE/TIME/TIMESTAMP +/ INTERVAL math #[derive(Debug)] pub struct DateTimeIntervalExpr { @@ -249,61 +248,6 @@ pub fn evaluate_array( Ok(ColumnarValue::Array(ret)) } -macro_rules! ts_sub_op { - ($lhs:ident, $rhs:ident, $lhs_tz:ident, $rhs_tz:ident, $coef:expr, $caster:expr, $op:expr, $ts_unit:expr, $mode:expr, $type_out:ty) => {{ - let prim_array_lhs = $caster(&$lhs)?; - let prim_array_rhs = $caster(&$rhs)?; - let ret: PrimitiveArray<$type_out> = - arrow::compute::try_binary(prim_array_lhs, prim_array_rhs, |ts1, ts2| { - let (parsed_lhs_tz, parsed_rhs_tz) = - (parse_timezones($lhs_tz)?, parse_timezones($rhs_tz)?); - let (naive_lhs, naive_rhs) = calculate_naives::<$mode>( - ts1.mul_wrapping($coef), - parsed_lhs_tz, - ts2.mul_wrapping($coef), - parsed_rhs_tz, - )?; - Ok($op($ts_unit(&naive_lhs), $ts_unit(&naive_rhs))) - })?; - Arc::new(ret) as ArrayRef - }}; -} -macro_rules! interval_op { - ($lhs:ident, $rhs:ident, $caster:expr, $op:expr, $sign:ident, $type_in:ty) => {{ - let prim_array_lhs = $caster(&$lhs)?; - let prim_array_rhs = $caster(&$rhs)?; - let ret = Arc::new(binary::<$type_in, $type_in, _, $type_in>( - prim_array_lhs, - prim_array_rhs, - |interval1, interval2| $op(interval1, interval2, $sign), - )?) as ArrayRef; - ret - }}; -} -macro_rules! interval_cross_op { - ($lhs:ident, $rhs:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $commute:ident, $type_in1:ty, $type_in2:ty) => {{ - let prim_array_lhs = $caster1(&$lhs)?; - let prim_array_rhs = $caster2(&$rhs)?; - let ret = Arc::new(binary::<$type_in1, $type_in2, _, IntervalMonthDayNanoType>( - prim_array_lhs, - prim_array_rhs, - |interval1, interval2| $op(interval1, interval2, $sign, $commute), - )?) as ArrayRef; - ret - }}; -} -macro_rules! ts_interval_op { - ($lhs:ident, $rhs:ident, $tz:ident, $caster1:expr, $caster2:expr, $op:expr, $sign:ident, $type_in1:ty, $type_in2:ty) => {{ - let prim_array_lhs = $caster1(&$lhs)?; - let prim_array_rhs = $caster2(&$rhs)?; - let ret: PrimitiveArray<$type_in1> = arrow::compute::try_binary( - prim_array_lhs, - prim_array_rhs, - |ts, interval| Ok($op(ts, interval as i128, $sign)?), - )?; - Arc::new(ret.with_timezone_opt($tz.clone())) as ArrayRef - }}; -} // This function evaluates temporal array operations, such as timestamp - timestamp, interval + interval, // timestamp + interval, and interval + timestamp. It takes two arrays as input and an integer sign representing // the operation (+1 for addition and -1 for subtraction). It returns a ColumnarValue as output, which can hold @@ -331,7 +275,7 @@ pub fn evaluate_temporal_arrays( ts_interval_array_op(array_rhs, sign, array_lhs)? } (_, _) => Err(DataFusionError::Execution(format!( - "Invalid array types for DateIntervalExpr: {:?} {} {:?}", + "Invalid array types for DateIntervalExpr: {} {} {}", array_lhs.data_type(), sign, array_rhs.data_type() @@ -340,396 +284,6 @@ pub fn evaluate_temporal_arrays( Ok(ColumnarValue::Array(ret)) } -/// Performs a timestamp subtraction operation on two arrays and returns the resulting array. -fn ts_array_op(array_lhs: &ArrayRef, array_rhs: &ArrayRef) -> Result { - match (array_lhs.data_type(), array_rhs.data_type()) { - ( - DataType::Timestamp(TimeUnit::Second, opt_tz_lhs), - DataType::Timestamp(TimeUnit::Second, opt_tz_rhs), - ) => Ok(ts_sub_op!( - array_lhs, - array_rhs, - opt_tz_lhs, - opt_tz_rhs, - 1000i64, - as_timestamp_second_array, - seconds_sub, - NaiveDateTime::timestamp, - MILLISECOND_MODE, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Millisecond, opt_tz_lhs), - DataType::Timestamp(TimeUnit::Millisecond, opt_tz_rhs), - ) => Ok(ts_sub_op!( - array_lhs, - array_rhs, - opt_tz_lhs, - opt_tz_rhs, - 1i64, - as_timestamp_millisecond_array, - milliseconds_sub, - NaiveDateTime::timestamp_millis, - MILLISECOND_MODE, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Microsecond, opt_tz_lhs), - DataType::Timestamp(TimeUnit::Microsecond, opt_tz_rhs), - ) => Ok(ts_sub_op!( - array_lhs, - array_rhs, - opt_tz_lhs, - opt_tz_rhs, - 1000i64, - as_timestamp_microsecond_array, - microseconds_sub, - NaiveDateTime::timestamp_micros, - NANOSECOND_MODE, - IntervalMonthDayNanoType - )), - ( - DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_lhs), - DataType::Timestamp(TimeUnit::Nanosecond, opt_tz_rhs), - ) => Ok(ts_sub_op!( - array_lhs, - array_rhs, - opt_tz_lhs, - opt_tz_rhs, - 1i64, - as_timestamp_nanosecond_array, - nanoseconds_sub, - NaiveDateTime::timestamp_nanos, - NANOSECOND_MODE, - IntervalMonthDayNanoType - )), - (_, _) => Err(DataFusionError::Execution(format!( - "Invalid array types for Timestamp subtraction: {:?} - {:?}", - array_lhs.data_type(), - array_rhs.data_type() - ))), - } -} -/// Performs an interval operation on two arrays and returns the resulting array. -/// The operation sign determines whether to perform addition or subtraction. -/// The data type and unit of the two input arrays must match the supported combinations. -fn interval_array_op( - array_lhs: &ArrayRef, - array_rhs: &ArrayRef, - sign: i32, -) -> Result { - match (array_lhs.data_type(), array_rhs.data_type()) { - ( - DataType::Interval(IntervalUnit::YearMonth), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(interval_op!( - array_lhs, - array_rhs, - as_interval_ym_array, - op_ym, - sign, - IntervalYearMonthType - )), - ( - DataType::Interval(IntervalUnit::YearMonth), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(interval_cross_op!( - array_lhs, - array_rhs, - as_interval_ym_array, - as_interval_dt_array, - op_ym_dt, - sign, - false, - IntervalYearMonthType, - IntervalDayTimeType - )), - ( - DataType::Interval(IntervalUnit::YearMonth), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(interval_cross_op!( - array_lhs, - array_rhs, - as_interval_ym_array, - as_interval_mdn_array, - op_ym_mdn, - sign, - false, - IntervalYearMonthType, - IntervalMonthDayNanoType - )), - ( - DataType::Interval(IntervalUnit::DayTime), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(interval_cross_op!( - array_rhs, - array_lhs, - as_interval_ym_array, - as_interval_dt_array, - op_ym_dt, - sign, - true, - IntervalYearMonthType, - IntervalDayTimeType - )), - ( - DataType::Interval(IntervalUnit::DayTime), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(interval_op!( - array_lhs, - array_rhs, - as_interval_dt_array, - op_dt, - sign, - IntervalDayTimeType - )), - ( - DataType::Interval(IntervalUnit::DayTime), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(interval_cross_op!( - array_lhs, - array_rhs, - as_interval_dt_array, - as_interval_mdn_array, - op_dt_mdn, - sign, - false, - IntervalDayTimeType, - IntervalMonthDayNanoType - )), - ( - DataType::Interval(IntervalUnit::MonthDayNano), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(interval_cross_op!( - array_rhs, - array_lhs, - as_interval_ym_array, - as_interval_mdn_array, - op_ym_mdn, - sign, - true, - IntervalYearMonthType, - IntervalMonthDayNanoType - )), - ( - DataType::Interval(IntervalUnit::MonthDayNano), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(interval_cross_op!( - array_rhs, - array_lhs, - as_interval_dt_array, - as_interval_mdn_array, - op_dt_mdn, - sign, - true, - IntervalDayTimeType, - IntervalMonthDayNanoType - )), - ( - DataType::Interval(IntervalUnit::MonthDayNano), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(interval_op!( - array_lhs, - array_rhs, - as_interval_mdn_array, - op_mdn, - sign, - IntervalMonthDayNanoType - )), - (_, _) => Err(DataFusionError::Execution(format!( - "Invalid array types for Interval operation: {:?} {} {:?}", - array_lhs.data_type(), - sign, - array_rhs.data_type() - ))), - } -} -/// Performs a timestamp/interval operation on two arrays and returns the resulting array. -/// The operation sign determines whether to perform addition or subtraction. -/// The data type and unit of the two input arrays must match the supported combinations. -fn ts_interval_array_op( - array_lhs: &ArrayRef, - sign: i32, - array_rhs: &ArrayRef, -) -> Result { - match (array_lhs.data_type(), array_rhs.data_type()) { - ( - DataType::Timestamp(TimeUnit::Second, tz), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_second_array, - as_interval_ym_array, - seconds_add_array::, - sign, - TimestampSecondType, - IntervalYearMonthType - )), - ( - DataType::Timestamp(TimeUnit::Second, tz), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_second_array, - as_interval_dt_array, - seconds_add_array::, - sign, - TimestampSecondType, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Second, tz), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_second_array, - as_interval_mdn_array, - seconds_add_array::, - sign, - TimestampSecondType, - IntervalMonthDayNanoType - )), - ( - DataType::Timestamp(TimeUnit::Millisecond, tz), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_millisecond_array, - as_interval_ym_array, - milliseconds_add_array::, - sign, - TimestampMillisecondType, - IntervalYearMonthType - )), - ( - DataType::Timestamp(TimeUnit::Millisecond, tz), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_millisecond_array, - as_interval_dt_array, - milliseconds_add_array::, - sign, - TimestampMillisecondType, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Millisecond, tz), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_millisecond_array, - as_interval_mdn_array, - milliseconds_add_array::, - sign, - TimestampMillisecondType, - IntervalMonthDayNanoType - )), - ( - DataType::Timestamp(TimeUnit::Microsecond, tz), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_microsecond_array, - as_interval_ym_array, - microseconds_add_array::, - sign, - TimestampMicrosecondType, - IntervalYearMonthType - )), - ( - DataType::Timestamp(TimeUnit::Microsecond, tz), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_microsecond_array, - as_interval_dt_array, - microseconds_add_array::, - sign, - TimestampMicrosecondType, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Microsecond, tz), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_microsecond_array, - as_interval_mdn_array, - microseconds_add_array::, - sign, - TimestampMicrosecondType, - IntervalMonthDayNanoType - )), - ( - DataType::Timestamp(TimeUnit::Nanosecond, tz), - DataType::Interval(IntervalUnit::YearMonth), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_nanosecond_array, - as_interval_ym_array, - nanoseconds_add_array::, - sign, - TimestampNanosecondType, - IntervalYearMonthType - )), - ( - DataType::Timestamp(TimeUnit::Nanosecond, tz), - DataType::Interval(IntervalUnit::DayTime), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_nanosecond_array, - as_interval_dt_array, - nanoseconds_add_array::, - sign, - TimestampNanosecondType, - IntervalDayTimeType - )), - ( - DataType::Timestamp(TimeUnit::Nanosecond, tz), - DataType::Interval(IntervalUnit::MonthDayNano), - ) => Ok(ts_interval_op!( - array_lhs, - array_rhs, - tz, - as_timestamp_nanosecond_array, - as_interval_mdn_array, - nanoseconds_add_array::, - sign, - TimestampNanosecondType, - IntervalMonthDayNanoType - )), - (_, _) => Err(DataFusionError::Execution(format!( - "Invalid array types for Timestamp Interval operation: {:?} {} {:?}", - array_lhs.data_type(), - sign, - array_rhs.data_type() - ))), - } -} - #[cfg(test)] mod tests { use super::*;