Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added projection push down support to avro
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 9, 2022
1 parent 40ca3df commit 83d1c0c
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 33 deletions.
1 change: 1 addition & 0 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn main() -> Result<()> {
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
schema.fields,
None,
);

for maybe_chunk in reader {
Expand Down
4 changes: 3 additions & 1 deletion examples/avro_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ async fn main() -> Result<()> {

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Arc::new(avro_schemas);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;

pin_mut!(blocks);
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let projection = projection.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas)
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
});
let batch = handle.await.unwrap()?;
assert!(!batch.is_empty());
Expand Down
169 changes: 143 additions & 26 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,24 +137,6 @@ fn deserialize_value<'a>(
array.try_push_valid()?;
}
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
let data = &block[..12];
block = &block[12..];

let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000,
);

let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
AvroSchema::Record(Record { fields, .. }) => fields,
Expand Down Expand Up @@ -227,6 +209,25 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
PrimitiveType::MonthDayNano => {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
let data = &block[..12];
block = &block[12..];

let value = months_days_ns::new(
i32::from_le_bytes([data[0], data[1], data[2], data[3]]),
i32::from_le_bytes([data[4], data[5], data[6], data[7]]),
i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64
* 1_000_000,
);

let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<months_days_ns>>()
.unwrap();
array.push(Some(value))
}
_ => unreachable!(),
},
PhysicalType::Utf8 => {
Expand Down Expand Up @@ -283,11 +284,109 @@ fn deserialize_value<'a>(
Ok(block)
}

/// Deserializes a [`Block`] into [`Chunk`].
fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> Result<&'a [u8]> {
if field.is_nullable {
let variant = util::zigzag_i64(&mut block)?;
let is_null_first = is_union_null_first(avro_field);
if is_null_first && variant == 0 || !is_null_first && variant != 0 {
return Ok(block);
}
}
match &field.data_type {
DataType::List(inner) => {
let avro_inner = match avro_field {
AvroSchema::Array(inner) => inner.as_ref(),
AvroSchema::Union(u) => match &u.as_slice() {
&[AvroSchema::Array(inner), _] | &[_, AvroSchema::Array(inner)] => {
inner.as_ref()
}
_ => unreachable!(),
},
_ => unreachable!(),
};

loop {
let len = util::zigzag_i64(&mut block)? as usize;

if len == 0 {
break;
}

for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
}
}
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
AvroSchema::Record(Record { fields, .. }) => fields,
AvroSchema::Union(u) => match &u.as_slice() {
&[AvroSchema::Record(Record { fields, .. }), _]
| &[_, AvroSchema::Record(Record { fields, .. })] => fields,
_ => unreachable!(),
},
_ => unreachable!(),
};

for (field, avro_field) in inner_fields.iter().zip(fields.iter()) {
block = skip_item(field, &avro_field.schema, block)?;
}
}
_ => match field.data_type.to_physical_type() {
PhysicalType::Boolean => {
let _ = block[0] == 1;
block = &block[1..];
}
PhysicalType::Primitive(primitive) => match primitive {
PrimitiveType::Int32 => {
let _ = util::zigzag_i64(&mut block)?;
}
PrimitiveType::Int64 => {
let _ = util::zigzag_i64(&mut block)?;
}
PrimitiveType::Float32 => {
block = &block[std::mem::size_of::<f32>()..];
}
PrimitiveType::Float64 => {
block = &block[std::mem::size_of::<f64>()..];
}
PrimitiveType::MonthDayNano => {
block = &block[12..];
}
_ => unreachable!(),
},
PhysicalType::Utf8 | PhysicalType::Binary => {
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| {
ArrowError::ExternalFormat(
"Avro format contains a non-usize number of bytes".to_string(),
)
})?;
block = &block[len..];
}
PhysicalType::FixedSizeBinary => {
let len = if let DataType::FixedSizeBinary(len) = &field.data_type {
*len
} else {
unreachable!()
};

block = &block[len..];
}
PhysicalType::Dictionary(_) => {
let _ = util::zigzag_i64(&mut block)? as i32;
}
_ => todo!(),
},
}
Ok(block)
}

/// Deserializes a [`Block`] into [`Chunk`], projected
pub fn deserialize(
block: &Block,
fields: &[Field],
avro_schemas: &[AvroSchema],
projection: &[bool],
) -> Result<Chunk<Arc<dyn Array>>> {
let rows = block.number_of_rows;
let mut block = block.data.as_ref();
Expand All @@ -296,21 +395,39 @@ pub fn deserialize(
let mut arrays: Vec<Box<dyn MutableArray>> = fields
.iter()
.zip(avro_schemas.iter())
.map(|(field, avro_schema)| {
let data_type = field.data_type().to_logical_type();
make_mutable(data_type, Some(avro_schema), rows)
.zip(projection.iter())
.map(|((field, avro_schema), projection)| {
if *projection {
make_mutable(&field.data_type, Some(avro_schema), rows)
} else {
// just something; we are not going to use it
make_mutable(&DataType::Int32, None, 0)
}
})
.collect::<Result<_>>()?;

// this is _the_ expensive transpose (rows -> columns)
for _ in 0..rows {
for ((array, field), avro_field) in arrays
let iter = arrays
.iter_mut()
.zip(fields.iter())
.zip(avro_schemas.iter())
{
block = deserialize_item(array.as_mut(), field.is_nullable, avro_field, block)?
.zip(projection.iter());

for (((array, field), avro_field), projection) in iter {
block = if *projection {
deserialize_item(array.as_mut(), field.is_nullable, avro_field, block)
} else {
skip_item(field, avro_field, block)
}?
}
}
Chunk::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect())
Chunk::try_new(
arrays
.iter_mut()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.map(|array| array.as_arc())
.collect(),
)
}
13 changes: 11 additions & 2 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,23 @@ pub struct Reader<R: Read> {
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Vec<bool>,
}

impl<R: Read> Reader<R> {
/// Creates a new [`Reader`].
pub fn new(iter: Decompressor<R>, avro_schemas: Vec<AvroSchema>, fields: Vec<Field>) -> Self {
pub fn new(
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Option<Vec<bool>>,
) -> Self {
let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
Self {
iter,
avro_schemas,
fields,
projection,
}
}

Expand All @@ -72,10 +80,11 @@ impl<R: Read> Iterator for Reader<R> {
fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
let avro_schemas = &self.avro_schemas;
let projection = &self.projection;

self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas))
.map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas, projection))
}
}
53 changes: 51 additions & 2 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Ok(writer.into_inner().unwrap())
}

pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Chunk<Arc<dyn Array>>, Schema)> {
pub(super) fn read_avro(
mut avro: &[u8],
projection: Option<Vec<bool>>,
) -> Result<(Chunk<Arc<dyn Array>>, Schema)> {
let file = &mut avro;

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;
Expand All @@ -183,8 +186,21 @@ pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Chunk<Arc<dyn Array>>, Schem
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
schema.fields.clone(),
projection.clone(),
);

let schema = if let Some(projection) = projection {
let fields = schema
.fields
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect::<Vec<_>>();
Schema::from(fields)
} else {
schema
};

reader.next().unwrap().map(|x| (x, schema))
}

Expand All @@ -193,7 +209,7 @@ fn test(codec: Codec) -> Result<()> {
let expected = data();
let (_, expected_schema) = schema();

let (result, schema) = read_avro(&avro)?;
let (result, schema) = read_avro(&avro, None)?;

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
Expand All @@ -214,3 +230,36 @@ fn read_deflate() -> Result<()> {
fn read_snappy() -> Result<()> {
test(Codec::Snappy)
}

fn test_projected(projection: Vec<bool>) -> Result<()> {
let avro = write_avro(Codec::Null).unwrap();
let expected = data();
let expected = expected
.into_arrays()
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect();
let expected = Chunk::new(expected);
let (_, expected_schema) = schema();
let expected_fields = expected_schema
.fields
.into_iter()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.collect::<Vec<_>>();
let expected_schema = Schema::from(expected_fields);

let (result, schema) = read_avro(&avro, Some(projection))?;

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projected(vec![
true, false, false, false, false, false, false, false, false, false, false,
])
}
2 changes: 1 addition & 1 deletion tests/it/io/avro/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn roundtrip(compression: Option<write::Compression>) -> Result<()> {

let data = write_avro(&expected, &expected_schema, compression)?;

let (result, read_schema) = read_avro(&data)?;
let (result, read_schema) = read_avro(&data, None)?;

assert_eq!(expected_schema, read_schema);
for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/avro/write_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn roundtrip(compression: Option<write::Compression>) -> Result<()> {

let data = write_avro(&expected, &expected_schema, compression).await?;

let (result, read_schema) = read_avro(&data)?;
let (result, read_schema) = read_avro(&data, None)?;

assert_eq!(expected_schema, read_schema);
for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {
Expand Down

0 comments on commit 83d1c0c

Please sign in to comment.