Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support MonthDayNano interval int multiply #156

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions datafusion/core/src/sql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2615,21 +2615,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}

match interval_type.to_lowercase().as_str() {
"year" => Ok(align_interval_parts(interval_period * 12_f32, 0.0, 0.0)),
"quarter" => Ok(align_interval_parts(interval_period * 3_f32, 0.0, 0.0)),
"month" => Ok(align_interval_parts(interval_period, 0.0, 0.0)),
"week" | "weeks" => {
"years" | "year" | "y" => {
Ok(align_interval_parts(interval_period * 12_f32, 0.0, 0.0))
}
"quarter" | "qtr" => {
Ok(align_interval_parts(interval_period * 3_f32, 0.0, 0.0))
}
"months" | "month" | "mons" | "mon" => {
Ok(align_interval_parts(interval_period, 0.0, 0.0))
}
"weeks" | "week" | "w" => {
Ok(align_interval_parts(0.0, interval_period * 7_f32, 0.0))
}
"day" | "days" => Ok(align_interval_parts(0.0, interval_period, 0.0)),
"hour" | "hours" => {
"days" | "day" | "d" => {
Ok(align_interval_parts(0.0, interval_period, 0.0))
}
"hours" | "hour" | "h" => {
Ok((0, 0, interval_period * SECONDS_PER_HOUR * MILLIS_PER_SECOND))
}
"minutes" | "minute" => {
"minutes" | "minute" | "mins" | "min" | "m" => {
Ok((0, 0, interval_period * 60_f32 * MILLIS_PER_SECOND))
}
"seconds" | "second" => Ok((0, 0, interval_period * MILLIS_PER_SECOND)),
"milliseconds" | "millisecond" => Ok((0, 0, interval_period)),
"seconds" | "second" | "secs" | "sec" | "s" => {
Ok((0, 0, interval_period * MILLIS_PER_SECOND))
}
"milliseconds" | "millisecond" | "msecs" | "msec" | "ms" => {
Ok((0, 0, interval_period))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Invalid input syntax for type interval: {:?}",
value
Expand Down
22 changes: 22 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,28 @@ async fn test_interval_expressions() -> Result<()> {
"interval '1 year 1 day 1 hour 1 minute 1 second'",
"0 years 12 mons 1 days 1 hours 1 mins 1.000000000 secs"
);
test_expression!(
"interval '23 years 123 months 75 days 25 hours 73 mins 125 secs 1234 ms'",
"0 years 399 mons 75 days 26 hours 15 mins 6.234000000 secs"
);
test_expression!(
"11 * interval '0 years 2 months 75 days 25 hours 73 mins 125 secs 1234 ms'",
"0 years 22 mons 825 days 288 hours 46 mins 8.574000000 secs"
);
// TODO: f32 persicion problem in `fn sql_interval_to_literal`
// test_expression!(
// "interval '1 mon 87654321 ms'",
// "0 years 1 mons 0 days 24 hours 20 mins 54.321000000 secs"
// );
test_expression!(
"12345 * interval '2 years 2 months 3 days 13 hours 3 mins 3 secs 7654321 ms'",
"0 years 320970 mons 37035 days 187360 hours 28 mins 47.745000000 secs"
);
// NOTE: only fractional output is incorrect, the calculations under the hood are correct
test_expression!(
"5 * interval '-1 month -1 day -1 millisecond'",
"0 years -5 mons -5 days 0 hours 0 mins 0.-05000000 secs"
);

Ok(())
}
Expand Down
21 changes: 21 additions & 0 deletions datafusion/expr/src/binary_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ pub fn coerce_types(
Operator::Like | Operator::NotLike | Operator::ILike | Operator::NotILike => {
like_coercion(lhs_type, rhs_type)
}
Operator::Plus | Operator::Minus
if is_interval(lhs_type) && is_interval(rhs_type) =>
{
interval_add_coercion(lhs_type, rhs_type)
}
// for math expressions, the final value of the coercion is also the return type
// because coercion favours higher information types
Operator::Plus | Operator::Minus => {
Expand Down Expand Up @@ -769,6 +774,22 @@ fn null_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
_ => None,
}
}
/// coercion rules for `DataType::Interval <op> DataType::Interval` case, where `<op>` in `{+, -}`
fn interval_add_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
use arrow::datatypes::IntervalUnit::*;
match (lhs_type, rhs_type) {
(DataType::Interval(l_unit), DataType::Interval(r_unit)) => {
let coercion_unit = match (l_unit, r_unit) {
(MonthDayNano, _) | (_, MonthDayNano) => Some(MonthDayNano),
(DayTime, YearMonth) | (YearMonth, DayTime) => Some(MonthDayNano),
(l_unit, r_unit) if l_unit == r_unit => Some(l_unit.clone()),
_ => None,
};
Some(DataType::Interval(coercion_unit?))
}
_ => None,
}
}

#[cfg(test)]
mod tests {
Expand Down
83 changes: 68 additions & 15 deletions datafusion/physical-expr/src/expressions/binary_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,18 @@ fn interval_multiply_int(
.collect::<Result<IntervalDayTimeArray>>()?;
Ok(Arc::new(result))
}
DataType::Interval(unit) => Err(DataFusionError::Execution(format!(
"multiplication of interval type {:?} is not supported",
unit
))),
DataType::Interval(IntervalUnit::MonthDayNano) => {
let intervals = intervals
.as_any()
.downcast_ref::<IntervalMonthDayNanoArray>()
.unwrap();
let result = intervals
.iter()
.zip(multipliers.iter())
.map(|(i, m)| scalar_interval_month_day_nano_time_mul_int(i, m))
.collect::<Result<IntervalMonthDayNanoArray>>()?;
Ok(Arc::new(result))
}
t => Err(DataFusionError::Execution(format!(
"multiplication expected Interval, got {}",
t
Expand Down Expand Up @@ -295,6 +303,55 @@ fn scalar_interval_day_time_mul_int(
Ok(Some(interval_product))
}

fn scalar_interval_month_day_nano_time_mul_int(
interval: Option<i128>,
multiplier: Option<i64>,
) -> Result<Option<i128>> {
if interval.is_none() || multiplier.is_none() {
return Ok(None);
}
let interval = interval.unwrap();
let multiplier = multiplier.unwrap();

const _MONTHS_MASK: u128 = 0xFFFF_FFFF_0000_0000_0000_0000_0000_0000;
const DAYS_MASK: u128 = 0x0000_0000_FFFF_FFFF_0000_0000_0000_0000;
const NANOS_MASK: u128 = 0x0000_0000_0000_0000_FFFF_FFFF_FFFF_FFFF;
const _MONTHS_BITS: i32 = 32;
const DAYS_BITS: i32 = 32;
const NANOS_BITS: i32 = 64;
const DAYS_OFFSET: i32 = NANOS_BITS;
const MONTHS_OFFSET: i32 = DAYS_OFFSET + DAYS_BITS;

let interval = interval as u128;
let months = (interval >> MONTHS_OFFSET) as i32;
let days = (interval >> DAYS_OFFSET) as i32;
let nanos = interval as i64;

let multiplier = i32::try_from(multiplier).map_err(|err| {
DataFusionError::Execution(format!(
"unable to convert interval multiplier to Int32: {}",
err
))
})?;

let months = months.checked_mul(multiplier).ok_or_else(|| {
DataFusionError::Execution("interval out of range (months)".to_string())
})? as u128;
let days = days.checked_mul(multiplier).ok_or_else(|| {
DataFusionError::Execution("interval out of range (days)".to_string())
})? as u128;
let nanos = nanos.checked_mul(multiplier as i64).ok_or_else(|| {
DataFusionError::Execution("interval out of range (nanos)".to_string())
})? as u128;

let months_bits = months << MONTHS_OFFSET;
let days_bits = (days << DAYS_OFFSET) & DAYS_MASK;
let nanos_bits = nanos & NANOS_MASK;

let interval = (months_bits | days_bits | nanos_bits) as i128;
Ok(Some(interval))
}

fn timestamp_add_interval(
timestamps: Arc<dyn Array>,
intervals: Arc<dyn Array>,
Expand Down Expand Up @@ -467,28 +524,24 @@ fn scalar_timestamp_add_interval_month_day_nano(

let timestamp = timestamp_ns_to_datetime(timestamp);

// TODO: legacy code, check validity
let month = (interval >> (64 + 32)) & 0xFFFFFFFF;
let day = (interval >> 64) & 0xFFFFFFFF;
let nano = interval & 0xFFFFFFFFFFFFFFFF;
let negated = if negated { -1 } else { 1 };
let month = ((interval >> (64 + 32)) & 0xFFFFFFFF) as i32 * negated;
let day = ((interval >> 64) & 0xFFFFFFFF) as i32 * negated;
let nano = (interval & 0xFFFFFFFFFFFFFFFF) as i64 * negated as i64;

let result = if month > 0 && !negated || month < 0 && negated {
let result = if month >= 0 {
timestamp.checked_add_months(Months::new(month as u32))
} else {
timestamp.checked_sub_months(Months::new(month.abs() as u32))
};

let result = if day > 0 && !negated || day < 0 && negated {
let result = if day >= 0 {
result.and_then(|t| t.checked_add_days(Days::new(day as u64)))
} else {
result.and_then(|t| t.checked_sub_days(Days::new(day.abs() as u64)))
};

let result = result.and_then(|t| {
t.checked_add_signed(Duration::nanoseconds(
(nano as i64) * (if negated { -1 } else { 1 }),
))
});
let result = result.and_then(|t| t.checked_add_signed(Duration::nanoseconds(nano)));

let result = result.ok_or_else(|| {
DataFusionError::Execution(format!(
Expand Down
Loading