Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to deserialize JSON (!= NDJSON) #758

Merged
merged 1 commit into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,23 @@ use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::error::{ArrowError, Result};
use arrow2::io::json::read;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let mut reader = BufReader::new(File::open(path)?);
let reader = BufReader::new(File::open(path)?);
let data = serde_json::from_reader(reader)?;

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];
let data_type = read::infer_rows(&values)?;

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
Ok(read::deserialize_json(&values, data_type))
}

fn main() -> Result<()> {
Expand All @@ -42,7 +28,7 @@ fn main() -> Result<()> {

let file_path = &args[1];

let batch = read_path(file_path, None)?;
let batch = read_path(file_path)?;
println!("{:#?}", batch);
Ok(())
}
48 changes: 48 additions & 0 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
// Example of reading a NDJSON file.
let mut reader = BufReader::new(File::open(path)?);

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let batch = read_path(file_path, None)?;
println!("{:#?}", batch);
Ok(())
}
2 changes: 1 addition & 1 deletion guide/src/io/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This crate offers optional features that enable interoperability with different
* Arrow (`io_ipc`)
* CSV (`io_csv`)
* Parquet (`io_parquet`)
* Json (`io_json`)
* JSON and NDJSON (`io_json`)
* Avro (`io_avro` and `io_avro_async`)

In this section you can find a guide and examples for each one of them.
10 changes: 8 additions & 2 deletions guide/src/io/json_read.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
# JSON read

When compiled with feature `io_json`, you can use this crate to read JSON files.
When compiled with feature `io_json`, you can use this crate to read NDJSON files:

```rust
{{#include ../../../examples/json_read.rs}}
{{#include ../../../examples/ndjson_read.rs}}
```

Note how deserialization can be performed on a separate thread pool to avoid
blocking the runtime (see also [here](https://ryhl.io/blog/async-what-is-blocking/)).

This crate also supports reading JSON, at the expense of being unable to read the file in chunks.

```rust
{{#include ../../../examples/json_read.rs}}
```
7 changes: 7 additions & 0 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,10 @@ pub fn deserialize<A: AsRef<str>>(
let (_, columns, _) = deserialize_struct(&rows, data_type).into_data();
Ok(Chunk::new(columns))
}

/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`].
///
/// This function allows consuming deserialized JSON to Arrow.
pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc<dyn Array> {
_deserialize(rows, data_type)
}
32 changes: 22 additions & 10 deletions src/io/json/read/infer_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,21 @@ fn infer_value(value: &Value) -> Result<DataType> {
})
}

fn infer_array(array: &[Value]) -> Result<DataType> {
let types = array.iter().map(|a| {
/// Infers a [`DataType`] from a list of JSON values
pub fn infer_rows(rows: &[Value]) -> Result<DataType> {
let types = rows.iter().map(|a| {
Ok(match a {
Value::Null => None,
Value::Number(n) => Some(infer_number(n)),
Value::Bool(_) => Some(DataType::Boolean),
Value::String(_) => Some(DataType::Utf8),
Value::Array(array) => Some(infer_array(array)?),
Value::Object(_) => {
return Err(ArrowError::NotYetImplemented(
"Nested structs not yet supported".to_string(),
))
Value::Object(inner) => {
let fields = inner
.iter()
.map(|(key, value)| infer_value(value).map(|dt| Field::new(key, dt, true)))
.collect::<Result<Vec<_>>>()?;
Some(DataType::Struct(fields))
}
})
});
Expand All @@ -126,17 +129,26 @@ fn infer_array(array: &[Value]) -> Result<DataType> {
.flatten()
.collect::<Result<HashSet<_>>>()?;

// if a record contains only nulls, it is not
// added to values
Ok(if !types.is_empty() {
let types = types.into_iter().collect::<Vec<_>>();
let dt = coerce_data_type(&types);
DataType::List(Box::new(Field::new(ITEM_NAME, dt, true)))
coerce_data_type(&types)
} else {
DataType::Null
})
}

fn infer_array(values: &[Value]) -> Result<DataType> {
let dt = infer_rows(values)?;

// if a record contains only nulls, it is not
// added to values
Ok(if dt == DataType::Null {
dt
} else {
DataType::List(Box::new(Field::new(ITEM_NAME, dt, true)))
})
}

fn infer_number(n: &serde_json::Number) -> DataType {
if n.is_f64() {
DataType::Float64
Expand Down
2 changes: 1 addition & 1 deletion src/io/json/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod iterator;

use crate::error::{ArrowError, Result};

pub use deserialize::deserialize;
pub use deserialize::{deserialize, deserialize_json};
pub use infer_schema::*;

/// Reads rows from `reader` into `rows`. Returns the number of read items.
Expand Down
38 changes: 38 additions & 0 deletions tests/it/io/json/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io::Cursor;

use arrow2::array::*;
use arrow2::datatypes::*;
use arrow2::error::ArrowError;
use arrow2::error::Result;
use arrow2::io::json::read;

Expand Down Expand Up @@ -171,3 +172,40 @@ fn infer_nested_struct() -> Result<()> {
assert_eq!(result, fields);
Ok(())
}

#[test]
fn read_json() -> Result<()> {
let data = r#"[
{
"a": 1
},
{
"a": 2
},
{
"a": 3
}
]"#;

let data = serde_json::from_slice(data.as_bytes())?;

let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;

let data_type = read::infer_rows(&values)?;

let result = read::deserialize_json(&values, data_type);

let expected = StructArray::from_data(
DataType::Struct(vec![Field::new("a", DataType::Int64, true)]),
vec![Arc::new(Int64Array::from_slice([1, 2, 3])) as _],
None,
);

assert_eq!(expected, result.as_ref());

Ok(())
}