diff --git a/arrow/src/json/reader.rs b/arrow/src/json/reader.rs index 94b3ba0605e3..ad953e49b68e 100644 --- a/arrow/src/json/reader.rs +++ b/arrow/src/json/reader.rs @@ -38,7 +38,13 @@ //! //! let file = File::open("test/data/basic.json").unwrap(); //! -//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None); +//! let mut json = json::Reader::new( +//! BufReader::new(file), +//! Arc::new(schema), +//! 1024, +//! Default::default() +//! ); +//! //! let batch = json.next().unwrap().unwrap(); //! ``` @@ -55,6 +61,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; use crate::util::bit_util; +use crate::util::reader_parser::Parser; use crate::{array::*, buffer::Buffer}; #[derive(Debug, Clone)] @@ -563,7 +570,7 @@ where /// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap()); /// let inferred_schema = infer_json_schema(&mut reader, None).unwrap(); /// let batch_size = 1024; -/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None); +/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default()); /// /// // seek back to start so that the original file is usable again /// reader.seek(SeekFrom::Start(0)).unwrap(); @@ -576,31 +583,35 @@ where pub struct Decoder { /// Explicit schema for the JSON file schema: SchemaRef, - /// Optional projection for which columns to load (case-sensitive names) - projection: Option>, /// Batch size (number of records to load each time) batch_size: usize, + /// This is a collection of options for json decoder + doptions: DecoderOptions, +} + +#[derive(Default, Debug)] +pub struct DecoderOptions { + /// Optional projection for which columns to load (case-sensitive names) + projection: Option>, + /// optional HashMap of column names to its format string + format_strings: Option>, } impl Decoder { /// Create a new JSON decoder from any value that implements the `Iterator>` /// trait. - pub fn new( - schema: SchemaRef, - batch_size: usize, - projection: Option>, - ) -> Self { + pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self { Self { schema, - projection, batch_size, + doptions, } } /// Returns the schema of the reader, useful for getting the schema without reading /// record batches pub fn schema(&self) -> SchemaRef { - match &self.projection { + match &self.doptions.projection { Some(projection) => { let fields = self.schema.fields(); let projected_fields: Vec = fields @@ -645,7 +656,7 @@ impl Decoder { } let rows = &rows[..]; - let projection = self.projection.clone().unwrap_or_default(); + let projection = self.doptions.projection.clone().unwrap_or_default(); let arrays = self.build_struct_array(rows, self.schema.fields(), &projection); let projected_fields: Vec = if projection.is_empty() { @@ -913,7 +924,7 @@ impl Decoder { } #[allow(clippy::unnecessary_wraps)] - fn build_primitive_array( + fn build_primitive_array( &self, rows: &[Value], col_name: &str, @@ -922,20 +933,30 @@ impl Decoder { T: ArrowNumericType, T::Native: num::NumCast, { + let format_string = self + .doptions + .format_strings + .as_ref() + .and_then(|fmts| fmts.get(col_name)); Ok(Arc::new( rows.iter() .map(|row| { - row.get(&col_name) - .and_then(|value| { - if value.is_i64() { - value.as_i64().map(num::cast::cast) - } else if value.is_u64() { - value.as_u64().map(num::cast::cast) - } else { - value.as_f64().map(num::cast::cast) + row.get(&col_name).and_then(|value| { + if value.is_i64() { + value.as_i64().and_then(num::cast::cast) + } else if value.is_u64() { + value.as_u64().and_then(num::cast::cast) + } else if value.is_string() { + match format_string { + Some(fmt) => { + T::parse_formatted(value.as_str().unwrap(), fmt) + } + None => T::parse(value.as_str().unwrap()), } - }) - .flatten() + } else { + value.as_f64().and_then(num::cast::cast) + } + }) }) .collect::>(), )) @@ -1539,9 +1560,9 @@ impl Reader { reader: R, schema: SchemaRef, batch_size: usize, - projection: Option>, + doptions: DecoderOptions, ) -> Self { - Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection) + Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions) } /// Create a new JSON Reader from a `BufReader` @@ -1551,11 +1572,11 @@ impl Reader { reader: BufReader, schema: SchemaRef, batch_size: usize, - projection: Option>, + doptions: DecoderOptions, ) -> Self { Self { reader, - decoder: Decoder::new(schema, batch_size, projection), + decoder: Decoder::new(schema, batch_size, doptions), } } @@ -1591,6 +1612,8 @@ pub struct ReaderBuilder { batch_size: usize, /// Optional projection for which columns to load (zero-based column indices) projection: Option>, + /// optional HashMap of column names to format strings + format_strings: Option>, } impl Default for ReaderBuilder { @@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder { max_records: None, batch_size: 1024, projection: None, + format_strings: None, } } } @@ -1658,6 +1682,15 @@ impl ReaderBuilder { self } + /// Set the decoder's format Strings param + pub fn with_format_strings( + mut self, + format_strings: HashMap, + ) -> Self { + self.format_strings = Some(format_strings); + self + } + /// Create a new `Reader` from the `ReaderBuilder` pub fn build(self, source: R) -> Result> where @@ -1678,7 +1711,10 @@ impl ReaderBuilder { buf_reader, schema, self.batch_size, - self.projection, + DecoderOptions { + projection: self.projection, + format_strings: self.format_strings, + }, )) } } @@ -1711,7 +1747,7 @@ mod tests { .unwrap(); let batch = reader.next().unwrap().unwrap(); - assert_eq!(4, batch.num_columns()); + assert_eq!(5, batch.num_columns()); assert_eq!(12, batch.num_rows()); let schema = reader.schema(); @@ -1833,7 +1869,7 @@ mod tests { File::open("test/data/basic.json").unwrap(), Arc::new(schema.clone()), 1024, - None, + Default::default(), ); let reader_schema = reader.schema(); assert_eq!(reader_schema, Arc::new(schema)); @@ -1870,6 +1906,41 @@ mod tests { assert_eq!(-3.5, bb.value(1)); } + #[test] + fn test_json_format_strings_for_date() { + let schema = + Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)])); + let e = schema.column_with_name("e").unwrap(); + assert_eq!(&DataType::Date32, e.1.data_type()); + let mut fmts = HashMap::new(); + let date_format = "%Y-%m-%d".to_string(); + fmts.insert("e".to_string(), date_format.clone()); + + let mut reader: Reader = Reader::new( + File::open("test/data/basic.json").unwrap(), + schema.clone(), + 1024, + DecoderOptions { + format_strings: Some(fmts), + ..Default::default() + }, + ); + let reader_schema = reader.schema(); + assert_eq!(reader_schema, schema); + let batch = reader.next().unwrap().unwrap(); + + let ee = batch + .column(e.0) + .as_any() + .downcast_ref::() + .unwrap(); + let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap(); + assert_eq!(dt, ee.value(0)); + let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap(); + assert_eq!(dt, ee.value(1)); + assert!(!ee.is_valid(2)); + } + #[test] fn test_json_basic_schema_projection() { // We test implicit and explicit projection: @@ -1885,7 +1956,10 @@ mod tests { File::open("test/data/basic.json").unwrap(), Arc::new(schema), 1024, - Some(vec!["a".to_string(), "c".to_string()]), + DecoderOptions { + projection: Some(vec!["a".to_string(), "c".to_string()]), + ..Default::default() + }, ); let reader_schema = reader.schema(); let expected_schema = Arc::new(Schema::new(vec![ @@ -2052,7 +2126,8 @@ mod tests { file.seek(SeekFrom::Start(0)).unwrap(); let reader = BufReader::new(GzDecoder::new(&file)); - let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None); + let mut reader = + Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default()); let batch_gz = reader.next().unwrap().unwrap(); for batch in vec![batch, batch_gz] { @@ -3081,6 +3156,37 @@ mod tests { assert_eq!(5, aa.value(7)); } + #[test] + fn test_time_from_string() { + parse_string_column::(4); + parse_string_column::(4); + parse_string_column::(4); + parse_string_column::(4); + } + + fn parse_string_column(value: T::Native) + where + T: ArrowPrimitiveType, + { + let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]); + + let builder = ReaderBuilder::new() + .with_schema(Arc::new(schema)) + .with_batch_size(64); + let mut reader: Reader = builder + .build::(File::open("test/data/basic_nulls.json").unwrap()) + .unwrap(); + + let batch = reader.next().unwrap().unwrap(); + let dd = batch + .column(0) + .as_any() + .downcast_ref::>() + .unwrap(); + assert_eq!(value, dd.value(1)); + assert!(!dd.is_valid(2)); + } + #[test] fn test_json_read_nested_list() { let schema = Schema::new(vec![Field::new( @@ -3093,7 +3199,7 @@ mod tests { true, )]); - let decoder = Decoder::new(Arc::new(schema), 1024, None); + let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); let batch = decoder .next_batch( &mut vec![ @@ -3128,7 +3234,7 @@ mod tests { true, )]); - let decoder = Decoder::new(Arc::new(schema), 1024, None); + let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); let batch = decoder .next_batch( // NOTE: total struct element count needs to be greater than @@ -3157,7 +3263,7 @@ mod tests { #[test] fn test_json_read_binary_structs() { let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]); - let decoder = Decoder::new(Arc::new(schema), 1024, None); + let decoder = Decoder::new(Arc::new(schema), 1024, Default::default()); let batch = decoder .next_batch( &mut vec![ @@ -3200,7 +3306,7 @@ mod tests { let mut sum_a = 0; for batch in reader { let batch = batch.unwrap(); - assert_eq!(4, batch.num_columns()); + assert_eq!(5, batch.num_columns()); sum_num_rows += batch.num_rows(); num_batches += 1; let batch_schema = batch.schema(); diff --git a/arrow/src/util/reader_parser.rs b/arrow/src/util/reader_parser.rs index 591a3aedf78d..6b6f24f82a43 100644 --- a/arrow/src/util/reader_parser.rs +++ b/arrow/src/util/reader_parser.rs @@ -60,27 +60,39 @@ impl Parser for Int8Type {} impl Parser for TimestampNanosecondType { fn parse(string: &str) -> Option { - match Self::DATA_TYPE { - DataType::Timestamp(TimeUnit::Nanosecond, None) => { - string_to_timestamp_nanos(string).ok() - } - _ => None, - } + string_to_timestamp_nanos(string).ok() } } impl Parser for TimestampMicrosecondType { fn parse(string: &str) -> Option { - match Self::DATA_TYPE { - DataType::Timestamp(TimeUnit::Microsecond, None) => { - let nanos = string_to_timestamp_nanos(string).ok(); - nanos.map(|x| x / 1000) - } - _ => None, - } + let nanos = string_to_timestamp_nanos(string).ok(); + nanos.map(|x| x / 1000) + } +} + +impl Parser for TimestampMillisecondType { + fn parse(string: &str) -> Option { + let nanos = string_to_timestamp_nanos(string).ok(); + nanos.map(|x| x / 1_000_000) + } +} + +impl Parser for TimestampSecondType { + fn parse(string: &str) -> Option { + let nanos = string_to_timestamp_nanos(string).ok(); + nanos.map(|x| x / 1_000_000_000) } } +impl Parser for Time64NanosecondType {} + +impl Parser for Time64MicrosecondType {} + +impl Parser for Time32MillisecondType {} + +impl Parser for Time32SecondType {} + /// Number of days between 0001-01-01 and 1970-01-01 const EPOCH_DAYS_FROM_CE: i32 = 719_163; diff --git a/arrow/test/data/basic.json b/arrow/test/data/basic.json index dafd2dd2e420..556c39c46be9 100644 --- a/arrow/test/data/basic.json +++ b/arrow/test/data/basic.json @@ -1,6 +1,6 @@ -{"a":1, "b":2.0, "c":false, "d":"4"} -{"a":-10, "b":-3.5, "c":true, "d":"4"} -{"a":2, "b":0.6, "c":false, "d":"text"} +{"a":1, "b":2.0, "c":false, "d":"4", "e":"1970-1-2"} +{"a":-10, "b":-3.5, "c":true, "d":"4", "e": "1969-12-31"} +{"a":2, "b":0.6, "c":false, "d":"text", "e": "1970-01-02 11:11:11"} {"a":1, "b":2.0, "c":false, "d":"4"} {"a":7, "b":-3.5, "c":true, "d":"4"} {"a":1, "b":0.6, "c":false, "d":"text"} @@ -9,4 +9,4 @@ {"a":1, "b":0.6, "c":false, "d":"text"} {"a":1, "b":2.0, "c":false, "d":"4"} {"a":1, "b":-3.5, "c":true, "d":"4"} -{"a":100000000000000, "b":0.6, "c":false, "d":"text"} \ No newline at end of file +{"a":100000000000000, "b":0.6, "c":false, "d":"text"}