Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for pushdown projection in reading Avro (#827)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 9, 2022
1 parent a026b66 commit 2c4dbb2
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 40 deletions.
1 change: 1 addition & 0 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
),
avro_schema,
schema.fields,
None,
);

let mut rows = 0;
Expand Down
1 change: 1 addition & 0 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn main() -> Result<()> {
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
schema.fields,
None,
);

for maybe_chunk in reader {
Expand Down
4 changes: 3 additions & 1 deletion examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ async fn main() -> Result<()> {

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Arc::new(avro_schemas);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let projection = projection.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas)
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
Expand Down
169 changes: 143 additions & 26 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,6 @@ fn deserialize_value<'a>(
array.try_push_valid()?;
}
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
let data = &block[..12];
block = &block[12..];

let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000,
);

let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
AvroSchema::Record(Record { fields, .. }) => fields,
Expand Down Expand Up @@ -227,6 +209,25 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
PrimitiveType::MonthDayNano => {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
let data = &block[..12];
block = &block[12..];

let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64
* 1_000_000,
);

let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
Expand Down Expand Up @@ -283,11 +284,109 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes a [`Block`] into [`Chunk`].
fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> Result<&'a [u8]> {
if field.is_nullable {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
return Ok(block);
}
}
match &field.data_type {
DataType::List(inner) => {
let avro_inner = match avro_field {
AvroSchema::Array(inner) => inner.as_ref(),
AvroSchema::Union(u) => match &u.as_slice() {
&[AvroSchema::Array(inner), _] | &[_, AvroSchema::Array(inner)] => {
inner.as_ref()
}
_ => unreachable!(),
},
_ => unreachable!(),
};

loop {
let len = util::zigzag_i64(&mut block)? as usize;

if len == 0 {
break;
}

for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
}
}
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
AvroSchema::Record(Record { fields, .. }) => fields,
AvroSchema::Union(u) => match &u.as_slice() {
&[AvroSchema::Record(Record { fields, .. }), _]
| &[_, AvroSchema::Record(Record { fields, .. })] => fields,
_ => unreachable!(),
},
_ => unreachable!(),
};

for (field, avro_field) in inner_fields.iter().zip(fields.iter()) {
block = skip_item(field, &avro_field.schema, block)?;
}
}
_ => match field.data_type.to_physical_type() {
PhysicalType::Boolean => {
let _ = block[0] == 1;
block = &block[1..];
}
PhysicalType::Primitive(primitive) => match primitive {
PrimitiveType::Int32 => {
let _ = util::zigzag_i64(&mut block)?;
}
PrimitiveType::Int64 => {
let _ = util::zigzag_i64(&mut block)?;
}
PrimitiveType::Float32 => {
block = &block[std::mem::size_of::<f32>()..];
}
PrimitiveType::Float64 => {
block = &block[std::mem::size_of::<f64>()..];
}
PrimitiveType::MonthDayNano => {
block = &block[12..];
}
_ => unreachable!(),
},
PhysicalType::Utf8 | PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
block = &block[len..];
}
PhysicalType::FixedSizeBinary => {
let len = if let DataType::FixedSizeBinary(len) = &field.data_type {
*len
} else {
unreachable!()
};

block = &block[len..];
}
PhysicalType::Dictionary(_) => {
let _ = util::zigzag_i64(&mut block)? as i32;
}
_ => todo!(),
},
}
Ok(block)
}

/// Deserializes a [`Block`] into [`Chunk`], projected
pub fn deserialize(
block: &Block,
fields: &[Field],
avro_schemas: &[AvroSchema],
projection: &[bool],
) -> Result<Chunk<Arc<dyn Array>>> {
let rows = block.number_of_rows;
let mut block = block.data.as_ref();
Expand All @@ -296,21 +395,39 @@ pub fn deserialize(
let mut arrays: Vec<Box<dyn MutableArray>> = fields
.iter()
.zip(avro_schemas.iter())
.map(|(field, avro_schema)| {
let data_type = field.data_type().to_logical_type();
make_mutable(data_type, Some(avro_schema), rows)
.zip(projection.iter())
.map(|((field, avro_schema), projection)| {
if *projection {
make_mutable(&field.data_type, Some(avro_schema), rows)
} else {
// just something; we are not going to use it
make_mutable(&DataType::Int32, None, 0)
}
})
.collect::<Result<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for ((array, field), avro_field) in arrays
let iter = arrays
.iter_mut()
.zip(fields.iter())
.zip(avro_schemas.iter())
{
block = deserialize_item(array.as_mut(), field.is_nullable, avro_field, block)?
.zip(projection.iter());

for (((array, field), avro_field), projection) in iter {
block = if *projection {
deserialize_item(array.as_mut(), field.is_nullable, avro_field, block)
} else {
skip_item(field, avro_field, block)
}?
}
}
Chunk::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect())
Chunk::try_new(
arrays
.iter_mut()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.map(|array| array.as_arc())
.collect(),
)
}
17 changes: 13 additions & 4 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod schema;
mod util;

pub(super) use header::deserialize_header;
pub(super) use schema::convert_schema;
pub(super) use schema::infer_schema;

use crate::array::Array;
use crate::chunk::Chunk;
Expand All @@ -32,7 +32,7 @@ pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = convert_schema(&avro_schema)?;
let schema = infer_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
Expand All @@ -48,15 +48,23 @@ pub struct Reader<R: Read> {
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Vec<bool>,
}

impl<R: Read> Reader<R> {
/// Creates a new [`Reader`].
pub fn new(iter: Decompressor<R>, avro_schemas: Vec<AvroSchema>, fields: Vec<Field>) -> Self {
pub fn new(
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Option<Vec<bool>>,
) -> Self {
let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
Self {
iter,
avro_schemas,
fields,
projection,
}
}

Expand All @@ -72,10 +80,11 @@ impl<R: Read> Iterator for Reader<R> {
fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
let avro_schemas = &self.avro_schemas;
let projection = &self.projection;

self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas))
.map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas, projection))
}
}
7 changes: 4 additions & 3 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ fn external_props(schema: &AvroSchema) -> Metadata {
props
}

/// Maps an [`AvroSchema`] into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
/// Infers an [`Schema`] from the root [`AvroSchema`].
/// This
pub fn infer_schema(schema: &AvroSchema) -> Result<Schema> {
if let AvroSchema::Record(Record { fields, .. }) = schema {
Ok(fields
.iter()
Expand All @@ -35,7 +36,7 @@ pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
.into())
} else {
Err(ArrowError::OutOfSpec(
"An avro Schema must be of type Record".to_string(),
"The root AvroSchema must be of type Record".to_string(),
))
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::AsyncReadExt;
use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::infer_schema;
use super::super::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;
Expand All @@ -28,7 +28,7 @@ pub async fn read_metadata<R: AsyncRead + Unpin + Send>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = read_metadata_async(reader).await?;
let schema = convert_schema(&avro_schema)?;
let schema = infer_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
Expand Down
Loading

0 comments on commit 2c4dbb2

Please sign in to comment.