diff --git a/Cargo.toml b/Cargo.toml index 0df68fc2cd3..783300ca679 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true } regex = { version = "^1.3", optional = true } lazy_static = { version = "^1.4", optional = true } streaming-iterator = { version = "0.1", optional = true } +fallible-streaming-iterator = { version = "0.1", optional = true } serde = { version = "^1.0", features = ["rc"], optional = true } serde_derive = { version = "^1.0", optional = true } @@ -119,7 +120,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"] +io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] @@ -244,3 +245,7 @@ harness = false [[bench]] name = "iter_list" harness = false + +[[bench]] +name = "avro_read" +harness = false diff --git a/benches/avro_read.rs b/benches/avro_read.rs new file mode 100644 index 00000000000..23a9e9e1602 --- /dev/null +++ b/benches/avro_read.rs @@ -0,0 +1,83 @@ +use std::io::Cursor; +use std::sync::Arc; + +use avro_rs::types::Record; +use criterion::*; + +use arrow2::error::Result; +use arrow2::io::avro::read; +use avro_rs::*; +use avro_rs::{Codec, Schema as AvroSchema}; + +fn schema() -> AvroSchema { + let raw_schema = r#" + { + "type": "record", + "name": "test", + "fields": [ + {"name": "a", "type": "string"} + ] + } +"#; + AvroSchema::parse_str(raw_schema).unwrap() +} + +fn write(size: usize, has_codec: bool) -> Result> { + let avro = schema(); + // a writer needs a schema and something to write to + let mut writer: Writer>; + if has_codec { + writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate); + } else { + writer = Writer::new(&avro, Vec::new()); + } + + (0..size).for_each(|_| { + let mut record = Record::new(writer.schema()).unwrap(); + record.put("a", "foo"); + writer.append(record).unwrap(); + }); + + Ok(writer.into_inner().unwrap()) +} + +fn read_batch(buffer: &[u8], size: usize) -> Result<()> { + let mut file = Cursor::new(buffer); + + let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?; + + let reader = read::Reader::new( + read::Decompressor::new( + read::BlockStreamIterator::new(&mut file, file_marker), + codec, + ), + avro_schema, + Arc::new(schema), + ); + + let mut rows = 0; + for maybe_batch in reader { + let batch = maybe_batch?; + rows += batch.num_rows(); + } + assert_eq!(rows, size); + Ok(()) +} + +fn add_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("avro_read"); + + for log2_size in (10..=20).step_by(2) { + let size = 2usize.pow(log2_size); + let buffer = write(size, false).unwrap(); + + group.throughput(Throughput::Elements(size as u64)); + + group.bench_with_input(BenchmarkId::new("utf8", log2_size), &buffer, |b, buffer| { + b.iter(|| read_batch(buffer, size).unwrap()) + }); + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 069c40393c2..a0669507e13 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -4,8 +4,8 @@ use std::io::Read; use std::sync::Arc; use avro_rs::{Codec, Schema as AvroSchema}; +use fallible_streaming_iterator::FallibleStreamingIterator; use libflate::deflate::Decoder; -use streaming_iterator::StreamingIterator; mod deserialize; mod nested; @@ -68,17 +68,19 @@ fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) Ok(rows) } -fn decompress_block(buf: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { +/// Decompresses an avro block. +/// Returns whether the buffers where swapped. +fn decompress_block(block: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { match codec { Codec::Null => { - std::mem::swap(buf, decompress); - Ok(false) + std::mem::swap(block, decompress); + Ok(true) } Codec::Deflate => { decompress.clear(); - let mut decoder = Decoder::new(&buf[..]); + let mut decoder = Decoder::new(&block[..]); decoder.read_to_end(decompress)?; - Ok(true) + Ok(false) } } } @@ -106,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> { } } -impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { let (buf, rows) = &mut self.buf; - // todo: surface this error - *rows = read_block(self.reader, buf, self.file_marker).unwrap(); + *rows = read_block(self.reader, buf, self.file_marker)?; + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -144,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> { } } -impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { if self.was_swapped { std::mem::swap(self.blocks.buffer(), &mut self.buf.0); } - self.blocks.advance(); - self.was_swapped = - decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap(); + self.blocks.advance()?; + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -192,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> { type Item = Result; fn next(&mut self) -> Option { - if let Some((data, rows)) = self.iter.next() { - Some(deserialize::deserialize( - data, - *rows, - self.schema.clone(), - &self.avro_schemas, - )) - } else { - None - } + let schema = self.schema.clone(); + let avro_schemas = &self.avro_schemas; + + self.iter.next().transpose().map(|x| { + let (data, rows) = x?; + deserialize::deserialize(data, *rows, schema, avro_schemas) + }) } }