Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed casting of utf8 <> Timestamp with and without timezone (#376)
Browse files Browse the repository at this point in the history
* Fixed casting of timestamp.

* More casts.
  • Loading branch information
jorgecarleitao authored Sep 5, 2021
1 parent 4f8d793 commit 877739b
Show file tree
Hide file tree
Showing 9 changed files with 472 additions and 350 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ bench = false

[dependencies]
num-traits = "0.2"
chrono = "^0.4"
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.5", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }
# We need to Hash values before sending them to an hasher. This
Expand Down Expand Up @@ -85,6 +86,8 @@ full = [
"merge_sort",
"ahash",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
]
merge_sort = ["itertools"]
io_csv = ["csv", "lazy_static", "regex", "lexical-core", "streaming-iterator"]
Expand Down
38 changes: 30 additions & 8 deletions src/compute/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ mod binary_to;
mod boolean_to;
mod dictionary_to;
mod primitive_to;
mod timestamps;
mod utf8_to;

pub use binary_to::*;
pub use boolean_to::*;
pub use dictionary_to::*;
pub use primitive_to::*;
pub use timestamps::*;
pub use utf8_to::*;

/// options defining how Cast kernels behave
Expand Down Expand Up @@ -126,14 +124,16 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {

(Utf8, Date32) => true,
(Utf8, Date64) => true,
(Utf8, Timestamp(TimeUnit::Nanosecond, None)) => true,
(Utf8, Timestamp(TimeUnit::Nanosecond, _)) => true,
(Utf8, LargeUtf8) => true,
(Utf8, _) => is_numeric(to_type),
(LargeUtf8, Date32) => true,
(LargeUtf8, Date64) => true,
(LargeUtf8, Timestamp(TimeUnit::Nanosecond, None)) => true,
(LargeUtf8, Timestamp(TimeUnit::Nanosecond, _)) => true,
(LargeUtf8, Utf8) => true,
(LargeUtf8, _) => is_numeric(to_type),
(Timestamp(_, _), Utf8) => true,
(Timestamp(_, _), LargeUtf8) => true,
(_, Utf8) => is_numeric(from_type) || from_type == &Binary,
(_, LargeUtf8) => is_numeric(from_type) || from_type == &Binary,

Expand Down Expand Up @@ -487,7 +487,10 @@ fn cast_with_options(
LargeUtf8 => Ok(Box::new(utf8_to_large_utf8(
array.as_any().downcast_ref().unwrap(),
))),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_timestamp_ns_dyn::<i32>(array),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_naive_timestamp_ns_dyn::<i32>(array),
Timestamp(TimeUnit::Nanosecond, Some(tz)) => {
utf8_to_timestamp_ns_dyn::<i32>(array, tz.clone())
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand All @@ -508,7 +511,10 @@ fn cast_with_options(
Date64 => utf8_to_date64_dyn::<i64>(array),
Utf8 => utf8_large_to_utf8(array.as_any().downcast_ref().unwrap())
.map(|x| Box::new(x) as Box<dyn Array>),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_timestamp_ns_dyn::<i64>(array),
Timestamp(TimeUnit::Nanosecond, None) => utf8_to_naive_timestamp_ns_dyn::<i64>(array),
Timestamp(TimeUnit::Nanosecond, Some(tz)) => {
utf8_to_timestamp_ns_dyn::<i64>(array, tz.clone())
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -537,6 +543,14 @@ fn cast_with_options(
let array = Utf8Array::<i32>::from_trusted_len_iter(iter);
Ok(Box::new(array))
}
Timestamp(from_unit, Some(tz)) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(timestamp_to_utf8::<i32>(from, *from_unit, tz)?))
}
Timestamp(from_unit, None) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(naive_timestamp_to_utf8::<i32>(from, *from_unit)))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -565,6 +579,14 @@ fn cast_with_options(
let array = Utf8Array::<i64>::from_trusted_len_iter(iter);
Ok(Box::new(array))
}
Timestamp(from_unit, Some(tz)) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(timestamp_to_utf8::<i64>(from, *from_unit, tz)?))
}
Timestamp(from_unit, None) => {
let from = array.as_any().downcast_ref().unwrap();
Ok(Box::new(naive_timestamp_to_utf8::<i64>(from, *from_unit)))
}
_ => Err(ArrowError::NotYetImplemented(format!(
"Casting from {:?} to {:?} not supported",
from_type, to_type,
Expand Down Expand Up @@ -793,8 +815,8 @@ fn cast_with_options(
}
(Timestamp(_, _), Int64) => primitive_to_same_primitive_dyn::<i64>(array, to_type),
(Int64, Timestamp(_, _)) => primitive_to_same_primitive_dyn::<i64>(array, to_type),
(Timestamp(from_unit, tz1), Timestamp(to_unit, tz2)) if tz1 == tz2 => {
primitive_dyn!(array, timestamp_to_timestamp, *from_unit, *to_unit, tz2)
(Timestamp(from_unit, _), Timestamp(to_unit, tz)) => {
primitive_dyn!(array, timestamp_to_timestamp, *from_unit, *to_unit, tz)
}
(Timestamp(from_unit, _), Date32) => primitive_dyn!(array, timestamp_to_date32, *from_unit),
(Timestamp(from_unit, _), Date64) => primitive_dyn!(array, timestamp_to_date64, *from_unit),
Expand Down
142 changes: 142 additions & 0 deletions src/compute/cast/primitive_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
bitmap::Bitmap,
compute::arity::unary,
datatypes::{DataType, TimeUnit},
error::ArrowError,
temporal_conversions::*,
types::NativeType,
};
Expand Down Expand Up @@ -268,3 +269,144 @@ pub fn timestamp_to_timestamp(
unary(from, |x| (x * (to_size / from_size)), to_type)
}
}

fn timestamp_to_utf8_impl<O: Offset, T: chrono::TimeZone>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone: T,
) -> Utf8Array<O>
where
T::Offset: std::fmt::Display,
{
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ns_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_us_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ms_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_s_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_utc(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}

#[cfg(feature = "chrono-tz")]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset_tz(timezone_str);
if let Some(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
} else {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed",
timezone_str
)))
}
}

#[cfg(not(feature = "chrono-tz"))]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
_: &PrimitiveArray<i64>,
_: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
Err(ArrowError::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
)))
}

/// Returns a [`Utf8Array`] where every element is the utf8 representation of the timestamp in the rfc3339 format.
pub fn timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset(timezone_str);

if let Ok(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono::FixedOffset>(
from, time_unit, timezone,
))
} else {
chrono_tz_timestamp_to_utf8(from, time_unit, timezone_str)
}
}

/// Returns a [`Utf8Array`] where every element is the utf8 representation of the timestamp in the rfc3339 format.
pub fn naive_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
) -> Utf8Array<O> {
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ns_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_us_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ms_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_s_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}
Loading

0 comments on commit 877739b

Please sign in to comment.