Skip to content

Commit

Permalink
Allow json reader/decoder to work with format_strings for each field (#…
Browse files Browse the repository at this point in the history
…1451)

* implement parser for remaining types used by json decoder

* added format strings (hashmap) to json reader

the format_string map's key is column name.
The value will be used to parse the date64/date32 types from json
if the read value is of string type

add tests for formatted parser for date{32,64}type for json readers

all-parsers start

fixup! added format strings (hashmap) to json reader

* add DecoderOptions struct for holding options for decoder

that way later extensions to the decoder can be added to this struct
without breaking API.

* Fixup some comments

* added test for string parsing json reader for time{32,64} types

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
sum12 and alamb authored Apr 12, 2022
1 parent b12f7cd commit 68038f5
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 53 deletions.
178 changes: 142 additions & 36 deletions arrow/src/json/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@
//!
//! let file = File::open("test/data/basic.json").unwrap();
//!
//! let mut json = json::Reader::new(BufReader::new(file), Arc::new(schema), 1024, None);
//! let mut json = json::Reader::new(
//! BufReader::new(file),
//! Arc::new(schema),
//! 1024,
//! Default::default()
//! );
//!
//! let batch = json.next().unwrap().unwrap();
//! ```

Expand All @@ -55,6 +61,7 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;
use crate::util::bit_util;
use crate::util::reader_parser::Parser;
use crate::{array::*, buffer::Buffer};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -563,7 +570,7 @@ where
/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
/// let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
/// let batch_size = 1024;
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, None);
/// let decoder = Decoder::new(Arc::new(inferred_schema), batch_size, Default::default());
///
/// // seek back to start so that the original file is usable again
/// reader.seek(SeekFrom::Start(0)).unwrap();
Expand All @@ -576,31 +583,35 @@ where
pub struct Decoder {
/// Explicit schema for the JSON file
schema: SchemaRef,
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// Batch size (number of records to load each time)
batch_size: usize,
/// This is a collection of options for json decoder
doptions: DecoderOptions,
}

#[derive(Default, Debug)]
pub struct DecoderOptions {
/// Optional projection for which columns to load (case-sensitive names)
projection: Option<Vec<String>>,
/// optional HashMap of column names to its format string
format_strings: Option<HashMap<String, String>>,
}

impl Decoder {
/// Create a new JSON decoder from any value that implements the `Iterator<Item=Result<Value>>`
/// trait.
pub fn new(
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
) -> Self {
pub fn new(schema: SchemaRef, batch_size: usize, doptions: DecoderOptions) -> Self {
Self {
schema,
projection,
batch_size,
doptions,
}
}

/// Returns the schema of the reader, useful for getting the schema without reading
/// record batches
pub fn schema(&self) -> SchemaRef {
match &self.projection {
match &self.doptions.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
Expand Down Expand Up @@ -645,7 +656,7 @@ impl Decoder {
}

let rows = &rows[..];
let projection = self.projection.clone().unwrap_or_default();
let projection = self.doptions.projection.clone().unwrap_or_default();
let arrays = self.build_struct_array(rows, self.schema.fields(), &projection);

let projected_fields: Vec<Field> = if projection.is_empty() {
Expand Down Expand Up @@ -913,7 +924,7 @@ impl Decoder {
}

#[allow(clippy::unnecessary_wraps)]
fn build_primitive_array<T: ArrowPrimitiveType>(
fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
&self,
rows: &[Value],
col_name: &str,
Expand All @@ -922,20 +933,30 @@ impl Decoder {
T: ArrowNumericType,
T::Native: num::NumCast,
{
let format_string = self
.doptions
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
Ok(Arc::new(
rows.iter()
.map(|row| {
row.get(&col_name)
.and_then(|value| {
if value.is_i64() {
value.as_i64().map(num::cast::cast)
} else if value.is_u64() {
value.as_u64().map(num::cast::cast)
} else {
value.as_f64().map(num::cast::cast)
row.get(&col_name).and_then(|value| {
if value.is_i64() {
value.as_i64().and_then(num::cast::cast)
} else if value.is_u64() {
value.as_u64().and_then(num::cast::cast)
} else if value.is_string() {
match format_string {
Some(fmt) => {
T::parse_formatted(value.as_str().unwrap(), fmt)
}
None => T::parse(value.as_str().unwrap()),
}
})
.flatten()
} else {
value.as_f64().and_then(num::cast::cast)
}
})
})
.collect::<PrimitiveArray<T>>(),
))
Expand Down Expand Up @@ -1539,9 +1560,9 @@ impl<R: Read> Reader<R> {
reader: R,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
doptions: DecoderOptions,
) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, projection)
Self::from_buf_reader(BufReader::new(reader), schema, batch_size, doptions)
}

/// Create a new JSON Reader from a `BufReader<R: Read>`
Expand All @@ -1551,11 +1572,11 @@ impl<R: Read> Reader<R> {
reader: BufReader<R>,
schema: SchemaRef,
batch_size: usize,
projection: Option<Vec<String>>,
doptions: DecoderOptions,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, batch_size, projection),
decoder: Decoder::new(schema, batch_size, doptions),
}
}

Expand Down Expand Up @@ -1591,6 +1612,8 @@ pub struct ReaderBuilder {
batch_size: usize,
/// Optional projection for which columns to load (zero-based column indices)
projection: Option<Vec<String>>,
/// optional HashMap of column names to format strings
format_strings: Option<HashMap<String, String>>,
}

impl Default for ReaderBuilder {
Expand All @@ -1600,6 +1623,7 @@ impl Default for ReaderBuilder {
max_records: None,
batch_size: 1024,
projection: None,
format_strings: None,
}
}
}
Expand Down Expand Up @@ -1658,6 +1682,15 @@ impl ReaderBuilder {
self
}

/// Set the decoder's format Strings param
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self
}

/// Create a new `Reader` from the `ReaderBuilder`
pub fn build<R>(self, source: R) -> Result<Reader<R>>
where
Expand All @@ -1678,7 +1711,10 @@ impl ReaderBuilder {
buf_reader,
schema,
self.batch_size,
self.projection,
DecoderOptions {
projection: self.projection,
format_strings: self.format_strings,
},
))
}
}
Expand Down Expand Up @@ -1711,7 +1747,7 @@ mod tests {
.unwrap();
let batch = reader.next().unwrap().unwrap();

assert_eq!(4, batch.num_columns());
assert_eq!(5, batch.num_columns());
assert_eq!(12, batch.num_rows());

let schema = reader.schema();
Expand Down Expand Up @@ -1833,7 +1869,7 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
1024,
None,
Default::default(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
Expand Down Expand Up @@ -1870,6 +1906,41 @@ mod tests {
assert_eq!(-3.5, bb.value(1));
}

#[test]
fn test_json_format_strings_for_date() {
let schema =
Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, false)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
let date_format = "%Y-%m-%d".to_string();
fmts.insert("e".to_string(), date_format.clone());

let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
1024,
DecoderOptions {
format_strings: Some(fmts),
..Default::default()
},
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
let batch = reader.next().unwrap().unwrap();

let ee = batch
.column(e.0)
.as_any()
.downcast_ref::<Date32Array>()
.unwrap();
let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
assert_eq!(dt, ee.value(0));
let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
assert_eq!(dt, ee.value(1));
assert!(!ee.is_valid(2));
}

#[test]
fn test_json_basic_schema_projection() {
// We test implicit and explicit projection:
Expand All @@ -1885,7 +1956,10 @@ mod tests {
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
1024,
Some(vec!["a".to_string(), "c".to_string()]),
DecoderOptions {
projection: Some(vec!["a".to_string(), "c".to_string()]),
..Default::default()
},
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Expand Down Expand Up @@ -2052,7 +2126,8 @@ mod tests {
file.seek(SeekFrom::Start(0)).unwrap();

let reader = BufReader::new(GzDecoder::new(&file));
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), 64, None);
let mut reader =
Reader::from_buf_reader(reader, Arc::new(schema), 64, Default::default());
let batch_gz = reader.next().unwrap().unwrap();

for batch in vec![batch, batch_gz] {
Expand Down Expand Up @@ -3081,6 +3156,37 @@ mod tests {
assert_eq!(5, aa.value(7));
}

#[test]
fn test_time_from_string() {
parse_string_column::<Time64NanosecondType>(4);
parse_string_column::<Time64MicrosecondType>(4);
parse_string_column::<Time32MillisecondType>(4);
parse_string_column::<Time32SecondType>(4);
}

fn parse_string_column<T>(value: T::Native)
where
T: ArrowPrimitiveType,
{
let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);

let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();

let batch = reader.next().unwrap().unwrap();
let dd = batch
.column(0)
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
assert_eq!(value, dd.value(1));
assert!(!dd.is_valid(2));
}

#[test]
fn test_json_read_nested_list() {
let schema = Schema::new(vec![Field::new(
Expand All @@ -3093,7 +3199,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3128,7 +3234,7 @@ mod tests {
true,
)]);

let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
// NOTE: total struct element count needs to be greater than
Expand Down Expand Up @@ -3157,7 +3263,7 @@ mod tests {
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let decoder = Decoder::new(Arc::new(schema), 1024, None);
let decoder = Decoder::new(Arc::new(schema), 1024, Default::default());
let batch = decoder
.next_batch(
&mut vec![
Expand Down Expand Up @@ -3200,7 +3306,7 @@ mod tests {
let mut sum_a = 0;
for batch in reader {
let batch = batch.unwrap();
assert_eq!(4, batch.num_columns());
assert_eq!(5, batch.num_columns());
sum_num_rows += batch.num_rows();
num_batches += 1;
let batch_schema = batch.schema();
Expand Down
Loading

0 comments on commit 68038f5

Please sign in to comment.