From f838e6d7cc26c822bf1f2162354c2c99374dc853 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Sat, 11 Mar 2023 16:35:33 +0000 Subject: [PATCH 1/2] Add timezone support to JSON reader --- arrow-array/src/array/primitive_array.rs | 2 +- arrow-array/src/types.rs | 26 +++---- arrow-cast/src/cast.rs | 2 +- arrow-json/src/raw/mod.rs | 73 +++++++++++++---- arrow-json/src/raw/timestamp_array.rs | 99 ++++++++++++++++++++++++ 5 files changed, 169 insertions(+), 33 deletions(-) create mode 100644 arrow-json/src/raw/timestamp_array.rs diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index 408f0c4ae96a..d792f6819ae5 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -1049,7 +1049,7 @@ impl PrimitiveArray { self.data .clone() .into_builder() - .data_type(DataType::Timestamp(T::get_time_unit(), timezone)) + .data_type(DataType::Timestamp(T::UNIT, timezone)) .build_unchecked() }; PrimitiveArray::from(array_data) diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index f9ca050dc0e7..48eee4f5c3dc 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -287,30 +287,28 @@ impl ArrowTemporalType for DurationMicrosecondType {} impl ArrowTemporalType for DurationNanosecondType {} /// A timestamp type allows us to create array builders that take a timestamp. -pub trait ArrowTimestampType: ArrowTemporalType { +pub trait ArrowTimestampType: ArrowTemporalType { + /// The [`TimeUnit`] of this timestamp. + const UNIT: TimeUnit; + /// Returns the `TimeUnit` of this timestamp. - fn get_time_unit() -> TimeUnit; + #[deprecated(note = "Use Self::UNIT")] + fn get_time_unit() -> TimeUnit { + Self::UNIT + } } impl ArrowTimestampType for TimestampSecondType { - fn get_time_unit() -> TimeUnit { - TimeUnit::Second - } + const UNIT: TimeUnit = TimeUnit::Second; } impl ArrowTimestampType for TimestampMillisecondType { - fn get_time_unit() -> TimeUnit { - TimeUnit::Millisecond - } + const UNIT: TimeUnit = TimeUnit::Millisecond; } impl ArrowTimestampType for TimestampMicrosecondType { - fn get_time_unit() -> TimeUnit { - TimeUnit::Microsecond - } + const UNIT: TimeUnit = TimeUnit::Microsecond; } impl ArrowTimestampType for TimestampNanosecondType { - fn get_time_unit() -> TimeUnit { - TimeUnit::Nanosecond - } + const UNIT: TimeUnit = TimeUnit::Nanosecond; } impl IntervalYearMonthType { diff --git a/arrow-cast/src/cast.rs b/arrow-cast/src/cast.rs index 0a4b88ec89f6..1bd5027406b9 100644 --- a/arrow-cast/src/cast.rs +++ b/arrow-cast/src/cast.rs @@ -2630,7 +2630,7 @@ fn cast_string_to_timestamp< .downcast_ref::>() .unwrap(); - let scale_factor = match TimestampType::get_time_unit() { + let scale_factor = match TimestampType::UNIT { TimeUnit::Second => 1_000_000_000, TimeUnit::Millisecond => 1_000_000, TimeUnit::Microsecond => 1_000, diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs index 1ab879d203fb..57bec9ee49c0 100644 --- a/arrow-json/src/raw/mod.rs +++ b/arrow-json/src/raw/mod.rs @@ -27,10 +27,13 @@ use crate::raw::primitive_array::PrimitiveArrayDecoder; use crate::raw::string_array::StringArrayDecoder; use crate::raw::struct_array::StructArrayDecoder; use crate::raw::tape::{Tape, TapeDecoder, TapeElement}; +use crate::raw::timestamp_array::TimestampArrayDecoder; +use arrow_array::timezone::Tz; use arrow_array::types::*; use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader}; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit}; +use chrono::Utc; use std::io::BufRead; mod boolean_array; @@ -41,6 +44,7 @@ mod primitive_array; mod string_array; mod struct_array; mod tape; +mod timestamp_array; /// A builder for [`RawReader`] and [`RawDecoder`] pub struct RawReaderBuilder { @@ -293,10 +297,34 @@ fn make_decoder( data_type => (primitive_decoder, data_type), DataType::Float32 => primitive_decoder!(Float32Type, data_type), DataType::Float64 => primitive_decoder!(Float64Type, data_type), - DataType::Timestamp(TimeUnit::Second, None) => primitive_decoder!(TimestampSecondType, data_type), - DataType::Timestamp(TimeUnit::Millisecond, None) => primitive_decoder!(TimestampMillisecondType, data_type), - DataType::Timestamp(TimeUnit::Microsecond, None) => primitive_decoder!(TimestampMicrosecondType, data_type), - DataType::Timestamp(TimeUnit::Nanosecond, None) => primitive_decoder!(TimestampNanosecondType, data_type), + DataType::Timestamp(TimeUnit::Second, None) => { + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + }, + DataType::Timestamp(TimeUnit::Millisecond, None) => { + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + }, + DataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + }, + DataType::Timestamp(TimeUnit::Nanosecond, None) => { + Ok(Box::new(TimestampArrayDecoder::::new(data_type, Utc))) + }, + DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => { + let tz: Tz = tz.parse()?; + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + }, + DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => { + let tz: Tz = tz.parse()?; + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + }, + DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => { + let tz: Tz = tz.parse()?; + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + }, + DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => { + let tz: Tz = tz.parse()?; + Ok(Box::new(TimestampArrayDecoder::::new(data_type, tz))) + }, DataType::Date32 => primitive_decoder!(Date32Type, data_type), DataType::Date64 => primitive_decoder!(Date64Type, data_type), DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type), @@ -809,29 +837,27 @@ mod tests { fn test_timestamp() { let buf = r#" - {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30} - {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456} + {"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"} + {"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456} - {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123"} - {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00"} - {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z"} - {"c": "1997-01-31T14:26:56.123-05:00"} + {"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"} + {"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"} + {"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"} + {"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"} "#; + let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".to_string())); let schema = Arc::new(Schema::new(vec![ Field::new("a", T::DATA_TYPE, true), Field::new("b", T::DATA_TYPE, true), Field::new("c", T::DATA_TYPE, true), + Field::new("d", with_timezone, true), ])); let batches = do_read(buf, 1024, true, schema); assert_eq!(batches.len(), 1); - let unit = match T::DATA_TYPE { - DataType::Timestamp(unit, _) => unit, - _ => unreachable!(), - }; - let unit_in_nanos = match unit { + let unit_in_nanos: i64 = match T::UNIT { TimeUnit::Second => 1_000_000_000, TimeUnit::Millisecond => 1_000_000, TimeUnit::Microsecond => 1_000, @@ -859,7 +885,6 @@ mod tests { 1234, 0 ] - .map(T::Native::usize_as) ); let col3 = as_primitive_array::(batches[0].column(2)); @@ -874,7 +899,21 @@ mod tests { 854702816123000000 / unit_in_nanos, 854738816123000000 / unit_in_nanos ] - .map(T::Native::usize_as) + ); + + let col4 = as_primitive_array::(batches[0].column(3)); + + assert_eq!(col4.null_count(), 0); + assert_eq!( + col4.values(), + &[ + 854674016123000000 / unit_in_nanos, + 123, + 854702816123000000 / unit_in_nanos, + 854720816123000000 / unit_in_nanos, + 854674016000000000 / unit_in_nanos, + 854640000000000000 / unit_in_nanos + ] ); } diff --git a/arrow-json/src/raw/timestamp_array.rs b/arrow-json/src/raw/timestamp_array.rs new file mode 100644 index 000000000000..ec34bfe6c1ea --- /dev/null +++ b/arrow-json/src/raw/timestamp_array.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use chrono::TimeZone; +use num::NumCast; +use std::marker::PhantomData; + +use arrow_array::builder::PrimitiveBuilder; +use arrow_array::types::ArrowTimestampType; +use arrow_array::Array; +use arrow_cast::parse::string_to_datetime; +use arrow_data::ArrayData; +use arrow_schema::{ArrowError, DataType, TimeUnit}; + +use crate::raw::tape::{Tape, TapeElement}; +use crate::raw::{tape_error, ArrayDecoder}; + +/// A specialized [`PrimitiveArrayDecoder`] for +pub struct TimestampArrayDecoder { + data_type: DataType, + timezone: Tz, + // Invariant and Send + phantom: PhantomData P>, +} + +impl TimestampArrayDecoder { + pub fn new(data_type: DataType, timezone: Tz) -> Self { + Self { + data_type, + timezone, + phantom: Default::default(), + } + } +} + +impl ArrayDecoder for TimestampArrayDecoder +where + P: ArrowTimestampType, + Tz: TimeZone + Send, +{ + fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result { + let mut builder = PrimitiveBuilder::

::with_capacity(pos.len()) + .with_data_type(self.data_type.clone()); + + for p in pos { + match tape.get(*p) { + TapeElement::Null => builder.append_null(), + TapeElement::String(idx) => { + let s = tape.get_string(idx); + let date = string_to_datetime(&self.timezone, s).map_err(|e| { + ArrowError::JsonError(format!( + "failed to parse \"{s}\" as {}: {}", + self.data_type, e + )) + })?; + + let value = match P::UNIT { + TimeUnit::Second => date.timestamp(), + TimeUnit::Millisecond => date.timestamp_millis(), + TimeUnit::Microsecond => date.timestamp_micros(), + TimeUnit::Nanosecond => date.timestamp_nanos(), + }; + builder.append_value(value) + } + TapeElement::Number(idx) => { + let s = tape.get_string(idx); + let value = lexical_core::parse::(s.as_bytes()) + .ok() + .and_then(NumCast::from) + .ok_or_else(|| { + ArrowError::JsonError(format!( + "failed to parse {s} as {}", + self.data_type + )) + })?; + + builder.append_value(value) + } + d => return Err(tape_error(d, "primitive")), + } + } + + Ok(builder.finish().into_data()) + } +} From ba389391853fe2e277fb00e65a5be7312396df4a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Mar 2023 10:20:13 +0000 Subject: [PATCH 2/2] Fix doc --- arrow-json/src/raw/timestamp_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-json/src/raw/timestamp_array.rs b/arrow-json/src/raw/timestamp_array.rs index ec34bfe6c1ea..07feaa974ee4 100644 --- a/arrow-json/src/raw/timestamp_array.rs +++ b/arrow-json/src/raw/timestamp_array.rs @@ -29,7 +29,7 @@ use arrow_schema::{ArrowError, DataType, TimeUnit}; use crate::raw::tape::{Tape, TapeElement}; use crate::raw::{tape_error, ArrayDecoder}; -/// A specialized [`PrimitiveArrayDecoder`] for +/// A specialized [`ArrayDecoder`] for timestamps pub struct TimestampArrayDecoder { data_type: DataType, timezone: Tz,