Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Surfaced avro errors in reading. (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Oct 30, 2021
1 parent 31d57b8 commit eca8237
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 26 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true }
regex = { version = "^1.3", optional = true }
lazy_static = { version = "^1.4", optional = true }
streaming-iterator = { version = "0.1", optional = true }
fallible-streaming-iterator = { version = "0.1", optional = true }

serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
Expand Down Expand Up @@ -119,7 +120,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"]
io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"]
# io_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["io_json", "serde_derive", "hex"]
Expand Down Expand Up @@ -244,3 +245,7 @@ harness = false
[[bench]]
name = "iter_list"
harness = false

[[bench]]
name = "avro_read"
harness = false
83 changes: 83 additions & 0 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::io::Cursor;
use std::sync::Arc;

use avro_rs::types::Record;
use criterion::*;

use arrow2::error::Result;
use arrow2::io::avro::read;
use avro_rs::*;
use avro_rs::{Codec, Schema as AvroSchema};

fn schema() -> AvroSchema {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "a", "type": "string"}
]
}
"#;
AvroSchema::parse_str(raw_schema).unwrap()
}

fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
let avro = schema();
// a writer needs a schema and something to write to
let mut writer: Writer<Vec<u8>>;
if has_codec {
writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate);
} else {
writer = Writer::new(&avro, Vec::new());
}

(0..size).for_each(|_| {
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", "foo");
writer.append(record).unwrap();
});

Ok(writer.into_inner().unwrap())
}

fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
let mut file = Cursor::new(buffer);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?;

let reader = read::Reader::new(
read::Decompressor::new(
read::BlockStreamIterator::new(&mut file, file_marker),
codec,
),
avro_schema,
Arc::new(schema),
);

let mut rows = 0;
for maybe_batch in reader {
let batch = maybe_batch?;
rows += batch.num_rows();
}
assert_eq!(rows, size);
Ok(())
}

fn add_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("avro_read");

for log2_size in (10..=20).step_by(2) {
let size = 2usize.pow(log2_size);
let buffer = write(size, false).unwrap();

group.throughput(Throughput::Elements(size as u64));

group.bench_with_input(BenchmarkId::new("utf8", log2_size), &buffer, |b, buffer| {
b.iter(|| read_batch(buffer, size).unwrap())
});
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
51 changes: 26 additions & 25 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use std::io::Read;
use std::sync::Arc;

use avro_rs::{Codec, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;
use libflate::deflate::Decoder;
use streaming_iterator::StreamingIterator;

mod deserialize;
mod nested;
Expand Down Expand Up @@ -68,17 +68,19 @@ fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16])
Ok(rows)
}

fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
/// Decompresses an avro block.
/// Returns whether the buffers where swapped.
fn decompress_block(block: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
match codec {
Codec::Null => {
std::mem::swap(buf, decompress);
Ok(false)
std::mem::swap(block, decompress);
Ok(true)
}
Codec::Deflate => {
decompress.clear();
let mut decoder = Decoder::new(&buf[..]);
let mut decoder = Decoder::new(&block[..]);
decoder.read_to_end(decompress)?;
Ok(true)
Ok(false)
}
}
}
Expand Down Expand Up @@ -106,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
let (buf, rows) = &mut self.buf;
// todo: surface this error
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
*rows = read_block(self.reader, buf, self.file_marker)?;
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -144,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> {
}
}

impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> {
type Error = ArrowError;
type Item = (Vec<u8>, usize);

fn advance(&mut self) {
fn advance(&mut self) -> Result<()> {
if self.was_swapped {
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
}
self.blocks.advance();
self.was_swapped =
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
self.blocks.advance()?;
self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?;
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
Ok(())
}

fn get(&self) -> Option<&Self::Item> {
Expand Down Expand Up @@ -192,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
if let Some((data, rows)) = self.iter.next() {
Some(deserialize::deserialize(
data,
*rows,
self.schema.clone(),
&self.avro_schemas,
))
} else {
None
}
let schema = self.schema.clone();
let avro_schemas = &self.avro_schemas;

self.iter.next().transpose().map(|x| {
let (data, rows) = x?;
deserialize::deserialize(data, *rows, schema, avro_schemas)
})
}
}

0 comments on commit eca8237

Please sign in to comment.