Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for large* write to Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 8, 2022
1 parent d8093bf commit 566cf48
Showing 1 changed file with 66 additions and 42 deletions.
108 changes: 66 additions & 42 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,56 @@ use super::util;
/// (i.e. a column -> row transposition of types known at run-time)
pub type BoxSerializer<'a> = Box<dyn StreamingIterator<Item = [u8]> + 'a + Send + Sync>;

fn utf8_required<O: Offset>(array: &Utf8Array<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.values_iter(),
|x, buf| {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x.as_bytes());
},
vec![],
))
}

fn utf8_optional<O: Offset>(array: &Utf8Array<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
}

fn binary_required<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.values_iter(),
|x, buf| {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x);
},
vec![],
))
}

fn binary_optional<O: Offset>(array: &BinaryArray<O>) -> BoxSerializer {
Box::new(BufStreamingIterator::new(
array.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x);
}
},
vec![],
))
}

/// Creates a [`StreamingIterator`] trait object that presents items from `array`
/// encoded according to `schema`.
/// # Panic
Expand Down Expand Up @@ -46,54 +96,28 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
))
}
(PhysicalType::Utf8, AvroSchema::Union(_)) => {
let values = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x.as_bytes());
}
},
vec![],
))
utf8_optional::<i32>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::LargeUtf8, AvroSchema::Union(_)) => {
utf8_optional::<i64>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::Utf8, AvroSchema::String(_)) => {
let values = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.values_iter(),
|x, buf| {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x.as_bytes());
},
vec![],
))
utf8_required::<i32>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::LargeUtf8, AvroSchema::String(_)) => {
utf8_required::<i64>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::Binary, AvroSchema::Union(_)) => {
let values = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x);
}
},
vec![],
))
binary_optional::<i32>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::LargeBinary, AvroSchema::Union(_)) => {
binary_optional::<i64>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::Binary, AvroSchema::Bytes(_)) => {
let values = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.values_iter(),
|x, buf| {
util::zigzag_encode(x.len() as i64, buf).unwrap();
buf.extend_from_slice(x);
},
vec![],
))
binary_required::<i32>(array.as_any().downcast_ref().unwrap())
}
(PhysicalType::LargeBinary, AvroSchema::Bytes(_)) => {
binary_required::<i64>(array.as_any().downcast_ref().unwrap())
}

(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => {
Expand Down

0 comments on commit 566cf48

Please sign in to comment.