Skip to content

Commit

Permalink
Updates for chrono changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Omega359 committed Mar 9, 2024
1 parent 8abb99a commit 15c50eb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ arrow-string = { version = "50.0.0", default-features = false }
async-trait = "0.1.73"
bigdecimal = "=0.4.1"
bytes = "1.4"
chrono = { version = "=0.4.34", default-features = false }
chrono = { version = "0.4.34", default-features = false }
ctor = "0.2.0"
dashmap = "5.4.0"
datafusion = { path = "datafusion/core", version = "36.0.0", default-features = false }
Expand Down
28 changes: 14 additions & 14 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use arrow_array::types::{
TimestampSecondType,
};
use arrow_array::{ArrayRef, PrimitiveArray};
use chrono::{DateTime, Datelike, Duration, Months, NaiveDateTime, Utc};
use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};

use datafusion_common::cast::as_primitive_array;
use datafusion_common::{exec_err, not_impl_err, plan_err, Result, ScalarValue};
Expand Down Expand Up @@ -226,8 +226,7 @@ fn date_bin_months_interval(stride_months: i64, source: i64, origin: i64) -> i64
fn to_utc_date_time(nanos: i64) -> DateTime<Utc> {
let secs = nanos / 1_000_000_000;
let nsec = (nanos % 1_000_000_000) as u32;
let date = NaiveDateTime::from_timestamp_opt(secs, nsec).unwrap();
DateTime::<Utc>::from_naive_utc_and_offset(date, Utc)
DateTime::from_timestamp(secs, nsec).unwrap()
}

// Supported intervals:
Expand All @@ -244,8 +243,9 @@ fn date_bin_impl(
let stride = match stride {
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(v))) => {
let (days, ms) = IntervalDayTimeType::to_parts(*v);
let nanos = (Duration::days(days as i64) + Duration::milliseconds(ms as i64))
.num_nanoseconds();
let nanos = (TimeDelta::try_days(days as i64).unwrap()
+ TimeDelta::try_milliseconds(ms as i64).unwrap())
.num_nanoseconds();

match nanos {
Some(v) => Interval::Nanoseconds(v),
Expand All @@ -266,8 +266,9 @@ fn date_bin_impl(
Interval::Months(months as i64)
}
} else {
let nanos = (Duration::days(days as i64) + Duration::nanoseconds(nanos))
.num_nanoseconds();
let nanos = (TimeDelta::try_days(days as i64).unwrap()
+ Duration::nanoseconds(nanos))
.num_nanoseconds();
match nanos {
Some(v) => Interval::Nanoseconds(v),
_ => return exec_err!("DATE_BIN stride argument is too large"),
Expand Down Expand Up @@ -423,6 +424,7 @@ mod tests {
use arrow::datatypes::{DataType, TimeUnit};
use arrow_array::types::TimestampNanosecondType;
use arrow_array::{IntervalDayTimeArray, TimestampNanosecondArray};
use chrono::TimeDelta;

use datafusion_common::ScalarValue;
use datafusion_expr::{ColumnarValue, ScalarUDFImpl};
Expand Down Expand Up @@ -705,44 +707,42 @@ mod tests {

#[test]
fn test_date_bin_single() {
use chrono::Duration;

let cases = vec![
(
(
Duration::minutes(15),
TimeDelta::try_minutes(15).unwrap(),
"2004-04-09T02:03:04.123456789Z",
"2001-01-01T00:00:00",
),
"2004-04-09T02:00:00Z",
),
(
(
Duration::minutes(15),
TimeDelta::try_minutes(15).unwrap(),
"2004-04-09T02:03:04.123456789Z",
"2001-01-01T00:02:30",
),
"2004-04-09T02:02:30Z",
),
(
(
Duration::minutes(15),
TimeDelta::try_minutes(15).unwrap(),
"2004-04-09T02:03:04.123456789Z",
"2005-01-01T00:02:30",
),
"2004-04-09T02:02:30Z",
),
(
(
Duration::hours(1),
TimeDelta::try_hours(1).unwrap(),
"2004-04-09T02:03:04.123456789Z",
"2001-01-01T00:00:00",
),
"2004-04-09T02:00:00Z",
),
(
(
Duration::seconds(10),
TimeDelta::try_seconds(10).unwrap(),
"2004-04-09T02:03:11.123456789Z",
"2001-01-01T00:00:00",
),
Expand Down
12 changes: 7 additions & 5 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use arrow_array::types::{
};
use arrow_array::{Array, PrimitiveArray};
use chrono::{
DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, Timelike,
DateTime, Datelike, Duration, LocalResult, NaiveDateTime, Offset, TimeDelta, Timelike,
};

use datafusion_common::cast::as_primitive_array;
Expand Down Expand Up @@ -229,7 +229,9 @@ where
.and_then(|d| d.with_second(0))
.and_then(|d| d.with_minute(0))
.and_then(|d| d.with_hour(0))
.map(|d| d - Duration::seconds(60 * 60 * 24 * d.weekday() as i64)),
.map(|d| {
d - TimeDelta::try_seconds(60 * 60 * 24 * d.weekday() as i64).unwrap()
}),
"month" => value
.and_then(|d| d.with_nanosecond(0))
.and_then(|d| d.with_second(0))
Expand Down Expand Up @@ -280,10 +282,10 @@ fn _date_trunc_coarse_with_tz(
// To account for this adjust the time by a few hours, convert to local time,
// and then adjust the time back.
truncated
.sub(Duration::hours(3))
.sub(TimeDelta::try_hours(3).unwrap())
.and_local_timezone(value.timezone())
.single()
.map(|v| v.add(Duration::hours(3)))
.map(|v| v.add(TimeDelta::try_hours(3).unwrap()))
}
LocalResult::Single(datetime) => Some(datetime),
LocalResult::Ambiguous(datetime1, datetime2) => {
Expand Down Expand Up @@ -311,7 +313,7 @@ fn _date_trunc_coarse_without_tz(
value: Option<NaiveDateTime>,
) -> Result<Option<i64>> {
let value = _date_trunc_coarse::<NaiveDateTime>(granularity, value)?;
Ok(value.and_then(|value| value.timestamp_nanos_opt()))
Ok(value.and_then(|value| value.and_utc().timestamp_nanos_opt()))
}

/// Truncates the single `value`, expressed in nanoseconds since the
Expand Down

0 comments on commit 15c50eb

Please sign in to comment.