diff --git a/benches/write_json.rs b/benches/write_json.rs index 11f550c90f5..8f2df8411bd 100644 --- a/benches/write_json.rs +++ b/benches/write_json.rs @@ -10,7 +10,7 @@ use arrow2::util::bench_util::*; fn write_batch(columns: &Chunk>) -> Result<()> { let mut writer = vec![]; - let format = write::JsonArray::default(); + let format = write::Format::Json; let batches = vec![Ok(columns.clone())].into_iter(); diff --git a/examples/json_write.rs b/examples/json_write.rs index 07213fc3ddc..086d18022c3 100644 --- a/examples/json_write.rs +++ b/examples/json_write.rs @@ -10,7 +10,7 @@ use arrow2::{ fn write_batches(path: &str, names: Vec, batches: &[Chunk>]) -> Result<()> { let mut writer = File::create(path)?; - let format = write::JsonArray::default(); + let format = write::Format::Json; let batches = batches.iter().cloned().map(Ok); diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index fcf1c435148..b1ac14ee536 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -1,8 +1,8 @@ //! APIs to write to JSON mod format; mod serialize; + pub use fallible_streaming_iterator::*; -pub use format::*; pub use serialize::serialize; use crate::{ @@ -10,10 +10,18 @@ use crate::{ chunk::Chunk, error::{ArrowError, Result}, }; +use format::*; + +/// The supported variations of JSON supported +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Format { + /// JSON + Json, + /// NDJSON (http://ndjson.org/) + NewlineDelimitedJson, +} -/// Writes blocks of JSON-encoded data into `writer`, ensuring that the written -/// JSON has the expected `format` -pub fn write(writer: &mut W, format: F, mut blocks: I) -> Result<()> +fn _write(writer: &mut W, format: F, mut blocks: I) -> Result<()> where W: std::io::Write, F: JsonFormat, @@ -30,28 +38,40 @@ where Ok(()) } +/// Writes blocks of JSON-encoded data into `writer` according to format [`Format`]. +/// # Implementation +/// This is IO-bounded +pub fn write(writer: &mut W, format: Format, blocks: I) -> Result<()> +where + W: std::io::Write, + I: FallibleStreamingIterator, +{ + match format { + Format::Json => _write(writer, JsonArray::default(), blocks), + Format::NewlineDelimitedJson => _write(writer, LineDelimited::default(), blocks), + } +} + /// [`FallibleStreamingIterator`] that serializes a [`Chunk`] to bytes. /// Advancing it is CPU-bounded -pub struct Serializer +pub struct Serializer where - F: JsonFormat, A: AsRef, I: Iterator>>, { batches: I, names: Vec, buffer: Vec, - format: F, + format: Format, } -impl Serializer +impl Serializer where - F: JsonFormat, A: AsRef, I: Iterator>>, { /// Creates a new [`Serializer`]. - pub fn new(batches: I, names: Vec, buffer: Vec, format: F) -> Self { + pub fn new(batches: I, names: Vec, buffer: Vec, format: Format) -> Self { Self { batches, names, @@ -61,9 +81,8 @@ where } } -impl FallibleStreamingIterator for Serializer +impl FallibleStreamingIterator for Serializer where - F: JsonFormat, A: AsRef, I: Iterator>>, { diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 59bab0bf646..3507c0d5be3 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -14,7 +14,8 @@ use crate::temporal_conversions::{ use crate::util::lexical_to_bytes_mut; use crate::{array::*, datatypes::DataType, types::NativeType}; -use super::{JsonArray, JsonFormat}; +use super::format::{JsonArray, JsonFormat, LineDelimited}; +use super::Format; fn boolean_serializer<'a>( array: &'a BooleanArray, @@ -249,7 +250,7 @@ fn serialize_item( /// Serializes a (name, array) to a valid JSON to `buffer` /// This is CPU-bounded -pub fn serialize(names: &[N], columns: &Chunk, format: F, buffer: &mut Vec) +fn _serialize(names: &[N], columns: &Chunk, format: F, buffer: &mut Vec) where N: AsRef, A: AsRef, @@ -278,3 +279,18 @@ where is_first_row = false; }) } + +/// Serializes a (name, array) to a valid JSON to `buffer` +/// This is CPU-bounded +pub fn serialize(names: &[N], columns: &Chunk, format: Format, buffer: &mut Vec) +where + N: AsRef, + A: AsRef, +{ + match format { + Format::Json => _serialize(names, columns, JsonArray::default(), buffer), + Format::NewlineDelimitedJson => { + _serialize(names, columns, LineDelimited::default(), buffer) + } + } +} diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index 03b61c3b5ad..ecd1e93530e 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -22,10 +22,10 @@ fn read_batch(data: String, fields: &[Field]) -> Result>> { json_read::deserialize(rows, fields) } -fn write_batch>( +fn write_batch>( batch: Chunk, names: Vec, - format: F, + format: json_write::Format, ) -> Result> { let batches = vec![Ok(batch)].into_iter(); @@ -46,7 +46,7 @@ fn round_trip(data: String) -> Result<()> { let buf = write_batch( columns.clone(), fields.iter().map(|x| x.name.clone()).collect(), - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; let new_chunk = read_batch(String::from_utf8(buf).unwrap(), &fields)?; diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index 6145fa0ddbc..756646696a8 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -20,7 +20,7 @@ fn write_simple_rows() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -45,7 +45,7 @@ fn write_simple_rows_array() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::JsonArray::default(), + json_write::Format::Json, )?; assert_eq!( @@ -88,7 +88,7 @@ fn write_nested_struct_with_validity() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -133,7 +133,7 @@ fn write_nested_structs() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -169,7 +169,7 @@ fn write_struct_with_list_field() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -213,7 +213,7 @@ fn write_nested_list() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -274,7 +274,7 @@ fn write_list_of_struct() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string(), "c2".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -296,7 +296,7 @@ fn write_escaped_utf8() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -315,7 +315,7 @@ fn write_quotation_marks_in_utf8() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -334,7 +334,7 @@ fn write_date32() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!( @@ -356,7 +356,7 @@ fn write_timestamp() -> Result<()> { let buf = write_batch( batch, vec!["c1".to_string()], - json_write::LineDelimited::default(), + json_write::Format::NewlineDelimitedJson, )?; assert_eq!(