Skip to content

Commit

Permalink
Add timezone support to JSON reader (#3845)
Browse files Browse the repository at this point in the history
* Add timezone support to JSON reader

* Fix doc
  • Loading branch information
tustvold authored Mar 17, 2023
1 parent 0df2188 commit eacb135
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 33 deletions.
2 changes: 1 addition & 1 deletion arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,7 @@ impl<T: ArrowTimestampType> PrimitiveArray<T> {
self.data
.clone()
.into_builder()
.data_type(DataType::Timestamp(T::get_time_unit(), timezone))
.data_type(DataType::Timestamp(T::UNIT, timezone))
.build_unchecked()
};
PrimitiveArray::from(array_data)
Expand Down
26 changes: 12 additions & 14 deletions arrow-array/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,30 +287,28 @@ impl ArrowTemporalType for DurationMicrosecondType {}
impl ArrowTemporalType for DurationNanosecondType {}

/// A timestamp type allows us to create array builders that take a timestamp.
pub trait ArrowTimestampType: ArrowTemporalType {
pub trait ArrowTimestampType: ArrowTemporalType<Native = i64> {
/// The [`TimeUnit`] of this timestamp.
const UNIT: TimeUnit;

/// Returns the `TimeUnit` of this timestamp.
fn get_time_unit() -> TimeUnit;
#[deprecated(note = "Use Self::UNIT")]
fn get_time_unit() -> TimeUnit {
Self::UNIT
}
}

impl ArrowTimestampType for TimestampSecondType {
fn get_time_unit() -> TimeUnit {
TimeUnit::Second
}
const UNIT: TimeUnit = TimeUnit::Second;
}
impl ArrowTimestampType for TimestampMillisecondType {
fn get_time_unit() -> TimeUnit {
TimeUnit::Millisecond
}
const UNIT: TimeUnit = TimeUnit::Millisecond;
}
impl ArrowTimestampType for TimestampMicrosecondType {
fn get_time_unit() -> TimeUnit {
TimeUnit::Microsecond
}
const UNIT: TimeUnit = TimeUnit::Microsecond;
}
impl ArrowTimestampType for TimestampNanosecondType {
fn get_time_unit() -> TimeUnit {
TimeUnit::Nanosecond
}
const UNIT: TimeUnit = TimeUnit::Nanosecond;
}

impl IntervalYearMonthType {
Expand Down
2 changes: 1 addition & 1 deletion arrow-cast/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2630,7 +2630,7 @@ fn cast_string_to_timestamp<
.downcast_ref::<GenericStringArray<Offset>>()
.unwrap();

let scale_factor = match TimestampType::get_time_unit() {
let scale_factor = match TimestampType::UNIT {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
Expand Down
73 changes: 56 additions & 17 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ use crate::raw::primitive_array::PrimitiveArrayDecoder;
use crate::raw::string_array::StringArrayDecoder;
use crate::raw::struct_array::StructArrayDecoder;
use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
use crate::raw::timestamp_array::TimestampArrayDecoder;
use arrow_array::timezone::Tz;
use arrow_array::types::*;
use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
use chrono::Utc;
use std::io::BufRead;

mod boolean_array;
Expand All @@ -41,6 +44,7 @@ mod primitive_array;
mod string_array;
mod struct_array;
mod tape;
mod timestamp_array;

/// A builder for [`RawReader`] and [`RawDecoder`]
pub struct RawReaderBuilder {
Expand Down Expand Up @@ -293,10 +297,34 @@ fn make_decoder(
data_type => (primitive_decoder, data_type),
DataType::Float32 => primitive_decoder!(Float32Type, data_type),
DataType::Float64 => primitive_decoder!(Float64Type, data_type),
DataType::Timestamp(TimeUnit::Second, None) => primitive_decoder!(TimestampSecondType, data_type),
DataType::Timestamp(TimeUnit::Millisecond, None) => primitive_decoder!(TimestampMillisecondType, data_type),
DataType::Timestamp(TimeUnit::Microsecond, None) => primitive_decoder!(TimestampMicrosecondType, data_type),
DataType::Timestamp(TimeUnit::Nanosecond, None) => primitive_decoder!(TimestampNanosecondType, data_type),
DataType::Timestamp(TimeUnit::Second, None) => {
Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, Utc)))
},
DataType::Timestamp(TimeUnit::Millisecond, None) => {
Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, Utc)))
},
DataType::Timestamp(TimeUnit::Microsecond, None) => {
Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, Utc)))
},
DataType::Timestamp(TimeUnit::Nanosecond, None) => {
Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, Utc)))
},
DataType::Timestamp(TimeUnit::Second, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
Ok(Box::new(TimestampArrayDecoder::<TimestampSecondType, _>::new(data_type, tz)))
},
DataType::Timestamp(TimeUnit::Millisecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
Ok(Box::new(TimestampArrayDecoder::<TimestampMillisecondType, _>::new(data_type, tz)))
},
DataType::Timestamp(TimeUnit::Microsecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
Ok(Box::new(TimestampArrayDecoder::<TimestampMicrosecondType, _>::new(data_type, tz)))
},
DataType::Timestamp(TimeUnit::Nanosecond, Some(ref tz)) => {
let tz: Tz = tz.parse()?;
Ok(Box::new(TimestampArrayDecoder::<TimestampNanosecondType, _>::new(data_type, tz)))
},
DataType::Date32 => primitive_decoder!(Date32Type, data_type),
DataType::Date64 => primitive_decoder!(Date64Type, data_type),
DataType::Time32(TimeUnit::Second) => primitive_decoder!(Time32SecondType, data_type),
Expand Down Expand Up @@ -809,29 +837,27 @@ mod tests {

fn test_timestamp<T: ArrowTimestampType>() {
let buf = r#"
{"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30}
{"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456}
{"a": 1, "b": "2020-09-08T13:42:29.190855+00:00", "c": 38.30, "d": "1997-01-31T09:26:56.123"}
{"a": 2, "b": "2020-09-08T13:42:29.190855Z", "c": 123.456, "d": 123.456}
{"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123"}
{"b": 40, "c": "2020-09-08T13:42:29.190855+00:00"}
{"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z"}
{"c": "1997-01-31T14:26:56.123-05:00"}
{"b": 1337, "b": "2020-09-08T13:42:29Z", "c": "1997-01-31T09:26:56.123", "d": "1997-01-31T09:26:56.123Z"}
{"b": 40, "c": "2020-09-08T13:42:29.190855+00:00", "d": "1997-01-31 09:26:56.123-05:00"}
{"b": 1234, "a": null, "c": "1997-01-31 09:26:56.123Z", "d": "1997-01-31 092656"}
{"c": "1997-01-31T14:26:56.123-05:00", "d": "1997-01-31"}
"#;

let with_timezone = DataType::Timestamp(T::UNIT, Some("+08:00".to_string()));
let schema = Arc::new(Schema::new(vec![
Field::new("a", T::DATA_TYPE, true),
Field::new("b", T::DATA_TYPE, true),
Field::new("c", T::DATA_TYPE, true),
Field::new("d", with_timezone, true),
]));

let batches = do_read(buf, 1024, true, schema);
assert_eq!(batches.len(), 1);

let unit = match T::DATA_TYPE {
DataType::Timestamp(unit, _) => unit,
_ => unreachable!(),
};
let unit_in_nanos = match unit {
let unit_in_nanos: i64 = match T::UNIT {
TimeUnit::Second => 1_000_000_000,
TimeUnit::Millisecond => 1_000_000,
TimeUnit::Microsecond => 1_000,
Expand Down Expand Up @@ -859,7 +885,6 @@ mod tests {
1234,
0
]
.map(T::Native::usize_as)
);

let col3 = as_primitive_array::<T>(batches[0].column(2));
Expand All @@ -874,7 +899,21 @@ mod tests {
854702816123000000 / unit_in_nanos,
854738816123000000 / unit_in_nanos
]
.map(T::Native::usize_as)
);

let col4 = as_primitive_array::<T>(batches[0].column(3));

assert_eq!(col4.null_count(), 0);
assert_eq!(
col4.values(),
&[
854674016123000000 / unit_in_nanos,
123,
854702816123000000 / unit_in_nanos,
854720816123000000 / unit_in_nanos,
854674016000000000 / unit_in_nanos,
854640000000000000 / unit_in_nanos
]
);
}

Expand Down
99 changes: 99 additions & 0 deletions arrow-json/src/raw/timestamp_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use chrono::TimeZone;
use num::NumCast;
use std::marker::PhantomData;

use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::ArrowTimestampType;
use arrow_array::Array;
use arrow_cast::parse::string_to_datetime;
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, TimeUnit};

use crate::raw::tape::{Tape, TapeElement};
use crate::raw::{tape_error, ArrayDecoder};

/// A specialized [`ArrayDecoder`] for timestamps
pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
data_type: DataType,
timezone: Tz,
// Invariant and Send
phantom: PhantomData<fn(P) -> P>,
}

impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
pub fn new(data_type: DataType, timezone: Tz) -> Self {
Self {
data_type,
timezone,
phantom: Default::default(),
}
}
}

impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
where
P: ArrowTimestampType,
Tz: TimeZone + Send,
{
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder = PrimitiveBuilder::<P>::with_capacity(pos.len())
.with_data_type(self.data_type.clone());

for p in pos {
match tape.get(*p) {
TapeElement::Null => builder.append_null(),
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let date = string_to_datetime(&self.timezone, s).map_err(|e| {
ArrowError::JsonError(format!(
"failed to parse \"{s}\" as {}: {}",
self.data_type, e
))
})?;

let value = match P::UNIT {
TimeUnit::Second => date.timestamp(),
TimeUnit::Millisecond => date.timestamp_millis(),
TimeUnit::Microsecond => date.timestamp_micros(),
TimeUnit::Nanosecond => date.timestamp_nanos(),
};
builder.append_value(value)
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
let value = lexical_core::parse::<f64>(s.as_bytes())
.ok()
.and_then(NumCast::from)
.ok_or_else(|| {
ArrowError::JsonError(format!(
"failed to parse {s} as {}",
self.data_type
))
})?;

builder.append_value(value)
}
d => return Err(tape_error(d, "primitive")),
}
}

Ok(builder.finish().into_data())
}
}

0 comments on commit eacb135

Please sign in to comment.