diff --git a/examples/avro_write.rs b/examples/avro_write.rs new file mode 100644 index 00000000000..c83e3d85b3d --- /dev/null +++ b/examples/avro_write.rs @@ -0,0 +1,54 @@ +use std::{fs::File, sync::Arc}; + +use arrow2::{ + array::{Array, Int32Array}, + datatypes::{Field, Schema}, + error::Result, + io::avro::write, + record_batch::RecordBatch, +}; + +fn main() -> Result<()> { + use std::env; + let args: Vec = env::args().collect(); + + let path = &args[1]; + + let array = Int32Array::from(&[ + Some(0), + Some(1), + Some(2), + Some(3), + Some(4), + Some(5), + Some(6), + ]); + let field = Field::new("c1", array.data_type().clone(), true); + let schema = Schema::new(vec![field]); + + let avro_schema = write::to_avro_schema(&schema)?; + + let mut file = File::create(path)?; + + let compression = None; + + write::write_metadata(&mut file, &avro_schema, compression)?; + + let serializer = write::new_serializer(&array, avro_schema.fields()[0]); + let mut block = write::Block::new(array.len(), vec![]); + + write::serialize(&mut vec![serializer], &mut block)?; + + let mut compressed_block = write::CompressedBlock::default(); + + if let Some(compression) = compression { + write::compress(&block, &mut compressed_block, compression)?; + } else { + compressed_block.number_of_rows = block.number_of_rows; + std::mem::swap(&mut compressed_block.data, &mut block.data); + } + + write::write_block(&mut file, &compressed_block)?; + + Ok(()) +} diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index 2cf3c216d1b..dc1e921e364 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -5,6 +5,7 @@ pub mod read; #[cfg(feature = "io_avro_async")] #[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] pub mod read_async; +pub mod write; use crate::error::ArrowError; @@ -14,6 +15,15 @@ impl From for ArrowError { } } +/// Valid compressions +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub enum Compression { + /// Deflate + Deflate, + /// Snappy + Snappy, +} + // macros that can operate in sync and async code. macro_rules! avro_decode { ($reader:ident $($_await:tt)*) => { diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs index 69bc0e3ffb6..40f30ac3d06 100644 --- a/src/io/avro/read/block.rs +++ b/src/io/avro/read/block.rs @@ -24,7 +24,7 @@ fn read_size(reader: &mut R) -> Result<(usize, usize)> { Ok((rows as usize, bytes as usize)) } -/// Reads a block from the file into `buf`. +/// Reads a block from the `reader` into `buf`. /// # Panic /// Panics iff the block marker does not equal to the file's marker fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) -> Result { diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index f702b913f70..5ddfc8e5c9b 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -24,14 +24,7 @@ use crate::datatypes::Schema; use crate::error::Result; use crate::record_batch::RecordBatch; -/// Valid compressions -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum Compression { - /// Deflate - Deflate, - /// Snappy - Snappy, -} +use super::Compression; /// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. #[allow(clippy::type_complexity)] diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 6304eb0892a..63212fe4de5 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -76,31 +76,38 @@ fn external_props(schema: &AvroSchema) -> BTreeMap { props } +/// Maps an Avro Schema into a [`Schema`]. pub fn convert_schema(schema: &AvroSchema) -> Result { - let mut schema_fields = vec![]; - match schema { - AvroSchema::Record { fields, .. } => { - for field in fields { - schema_fields.push(schema_to_field( - &field.schema, - Some(&field.name), - false, - Some(&external_props(&field.schema)), - )?) - } + let fields = match schema { + AvroSchema::Record { fields, .. } => fields, + other => { + return Err(ArrowError::OutOfSpec(format!( + "An avro Schema must be of type Record - it is of type {:?}", + other + ))) } - schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?), - } + }; + + let fields = fields + .iter() + .map(|field| { + schema_to_field( + &field.schema, + Some(&field.name), + false, + Some(external_props(&field.schema)), + ) + }) + .collect::>>()?; - let schema = Schema::new(schema_fields); - Ok(schema) + Ok(Schema::new(fields)) } fn schema_to_field( schema: &AvroSchema, name: Option<&str>, mut nullable: bool, - props: Option<&BTreeMap>, + props: Option>, ) -> Result { let data_type = match schema { AvroSchema::Null => DataType::Null, @@ -160,7 +167,7 @@ fn schema_to_field( &field.schema, Some(&format!("{}.{}", name.fullname(None), field.name)), false, - Some(&props), + Some(props), ) }) .collect(); @@ -189,6 +196,6 @@ fn schema_to_field( let name = name.unwrap_or_default(); let mut field = Field::new(name, data_type, nullable); - field.set_metadata(props.cloned()); + field.set_metadata(props); Ok(field) } diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs index d3931236793..2c41573d640 100644 --- a/src/io/avro/read_async/metadata.rs +++ b/src/io/avro/read_async/metadata.rs @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result}; use super::super::read::convert_schema; use super::super::read::deserialize_header; -use super::super::read::Compression; +use super::super::Compression; use super::super::{read_header, read_metadata}; use super::utils::zigzag_i64; diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs new file mode 100644 index 00000000000..9251ca24448 --- /dev/null +++ b/src/io/avro/write/block.rs @@ -0,0 +1,70 @@ +use std::io::Write; + +use crate::{error::Result, io::avro::Compression}; + +use super::{util::zigzag_encode, SYNC_NUMBER}; + +/// A compressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct CompressedBlock { + /// The number of rows + pub number_of_rows: usize, + /// The compressed data + pub data: Vec, +} + +impl CompressedBlock { + /// Creates a new CompressedBlock + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// An uncompressed Avro block. +#[derive(Debug, Clone, Default, PartialEq)] +pub struct Block { + /// The number of rows + pub number_of_rows: usize, + /// The uncompressed data + pub data: Vec, +} + +impl Block { + /// Creates a new Block + pub fn new(number_of_rows: usize, data: Vec) -> Self { + Self { + number_of_rows, + data, + } + } +} + +/// Writes a [`CompressedBlock`] to `writer` +pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> { + // write size and rows + zigzag_encode(compressed_block.number_of_rows as i64, writer)?; + zigzag_encode(compressed_block.data.len() as i64, writer)?; + + writer.write_all(&compressed_block.data)?; + + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// Compresses an [`Block`] to a [`CompressedBlock`]. +pub fn compress( + block: &Block, + compressed_block: &mut CompressedBlock, + compression: Compression, +) -> Result<()> { + match compression { + Compression::Deflate => todo!(), + Compression::Snappy => todo!(), + } + compressed_block.number_of_rows = block.number_of_rows; + Ok(()) +} diff --git a/src/io/avro/write/header.rs b/src/io/avro/write/header.rs new file mode 100644 index 00000000000..9227906ab97 --- /dev/null +++ b/src/io/avro/write/header.rs @@ -0,0 +1,30 @@ +use std::collections::HashMap; + +use avro_rs::Schema; +use serde_json; + +use crate::error::{ArrowError, Result}; + +use super::Compression; + +/// Serializes an [`Schema`] and optional [`Compression`] into an avro header. +pub(crate) fn serialize_header( + schema: &Schema, + compression: Option, +) -> Result>> { + let schema = + serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?; + + let mut header = HashMap::>::default(); + + header.insert("avro.schema".to_string(), schema.into_bytes()); + if let Some(compression) = compression { + let value = match compression { + Compression::Snappy => b"snappy".to_vec(), + Compression::Deflate => b"deflate".to_vec(), + }; + header.insert("avro.codec".to_string(), value); + }; + + Ok(header) +} diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs new file mode 100644 index 00000000000..2ec525840ec --- /dev/null +++ b/src/io/avro/write/mod.rs @@ -0,0 +1,66 @@ +//! APIs to write to Avro format. +use std::io::Write; + +use crate::error::Result; + +use super::Compression; + +mod header; +use header::serialize_header; +mod schema; +pub use schema::{to_avro_schema, AvroSchema}; +mod serialize; +pub use serialize::{can_serialize, new_serializer, BoxSerializer}; +mod block; +pub use block::*; +mod util; + +const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; + +/// Writes Avro's metadata to `writer`. +pub fn write_metadata( + writer: &mut W, + schema: &AvroSchema, + compression: Option, +) -> Result<()> { + // * Four bytes, ASCII 'O', 'b', 'j', followed by 1. + let avro_magic = [b'O', b'b', b'j', 1u8]; + writer.write_all(&avro_magic)?; + + // * file metadata, including the schema. + let header = serialize_header(&schema.0, compression)?; + + util::zigzag_encode(header.len() as i64, writer)?; + for (name, item) in header { + util::write_binary(name.as_bytes(), writer)?; + util::write_binary(&item, writer)?; + } + writer.write_all(&[0])?; + + // The 16-byte, randomly-generated sync marker for this file. + writer.write_all(&SYNC_NUMBER)?; + + Ok(()) +} + +/// consumes a set of [`BoxSerializer`] into an [`Block`]. +/// # Panics +/// Panics iff the number of items in any of the serializers is not equal to the number of rows +/// declared in the `block`. +pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> { + let Block { + data, + number_of_rows, + } = block; + + data.clear(); // restart it + + // _the_ transpose (columns -> rows) + for _ in 0..*number_of_rows { + for serializer in &mut *serializers { + let item_data = serializer.next().unwrap(); + data.write_all(item_data)?; + } + } + Ok(()) +} diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs new file mode 100644 index 00000000000..675de218916 --- /dev/null +++ b/src/io/avro/write/schema.rs @@ -0,0 +1,98 @@ +use avro_rs::schema::Name; +use avro_rs::schema::RecordField; +use avro_rs::schema::RecordFieldOrder; +use avro_rs::Schema as _AvroSchema; + +use crate::datatypes::*; +use crate::error::{ArrowError, Result}; + +/// An Avro schema +pub struct AvroSchema(pub _AvroSchema); + +impl AvroSchema { + /// Returns the fields of this Avro schema. + pub fn fields(&self) -> Vec<&_AvroSchema> { + if let _AvroSchema::Record { fields, .. } = &self.0 { + fields.iter().map(|x| &x.schema).collect() + } else { + unreachable!() + } + } +} + +impl From<_AvroSchema> for AvroSchema { + fn from(schema: _AvroSchema) -> Self { + Self(schema) + } +} + +impl From for _AvroSchema { + fn from(schema: AvroSchema) -> Self { + schema.0 + } +} + +/// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it. +pub fn to_avro_schema(schema: &Schema) -> Result { + let fields = schema + .fields + .iter() + .enumerate() + .map(|(index, field)| field_to_field(field, index)) + .collect::>>()?; + + Ok(_AvroSchema::Record { + fields, + lookup: Default::default(), + name: Name { + name: "".to_string(), + namespace: None, + aliases: None, + }, + doc: None, + } + .into()) +} + +fn field_to_field(field: &Field, index: usize) -> Result { + let schema = type_to_schema(field.data_type())?; + + Ok(RecordField { + name: field.name().to_string(), + doc: None, + default: None, + schema, + order: RecordFieldOrder::Ignore, + position: index, + }) +} + +fn type_to_schema(data_type: &DataType) -> Result<_AvroSchema> { + Ok(match data_type.to_logical_type() { + DataType::Null => _AvroSchema::Null, + DataType::Boolean => _AvroSchema::Boolean, + DataType::Int32 => _AvroSchema::Int, + DataType::Int64 => _AvroSchema::Long, + DataType::Float32 => _AvroSchema::Float, + DataType::Float64 => _AvroSchema::Double, + DataType::Binary => _AvroSchema::Bytes, + DataType::Utf8 => _AvroSchema::String, + DataType::List(inner) => _AvroSchema::Array(Box::new(type_to_schema(inner.data_type())?)), + DataType::Date32 => _AvroSchema::Date, + DataType::Time32(TimeUnit::Millisecond) => _AvroSchema::TimeMillis, + DataType::Time64(TimeUnit::Microsecond) => _AvroSchema::TimeMicros, + DataType::Timestamp(TimeUnit::Millisecond, None) => _AvroSchema::TimestampMillis, + DataType::Timestamp(TimeUnit::Microsecond, None) => _AvroSchema::TimestampMicros, + DataType::Interval(IntervalUnit::MonthDayNano) => _AvroSchema::Duration, + DataType::FixedSizeBinary(size) => _AvroSchema::Fixed { + size: *size, + name: Name::new(""), + }, + other => { + return Err(ArrowError::NotYetImplemented(format!( + "write {:?} to avro", + other + ))) + } + }) +} diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs new file mode 100644 index 00000000000..f6684a7c95b --- /dev/null +++ b/src/io/avro/write/serialize.rs @@ -0,0 +1,163 @@ +use avro_rs::Schema as AvroSchema; + +use crate::{array::*, datatypes::DataType}; + +use super::super::super::iterator::*; +use super::util; + +/// A type alias for a boxed [`StreamingIterator`], used to write arrays into avro rows +/// (i.e. a column -> row transposition of types known at run-time) +pub type BoxSerializer<'a> = Box + 'a + Send + Sync>; + +/// Creates a [`StreamingIterator`] trait object that presents items from `array` +/// encoded according to `schema`. +/// # Panic +/// This function panics iff the `data_type` is not supported (use [`can_serialize`] to check) +/// # Implementation +/// This function performs minimal CPU work: it dynamically dispatches based on the schema +/// and arrow type. +pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> { + let data_type = array.data_type().to_logical_type(); + + match (data_type, schema) { + (DataType::Boolean, AvroSchema::Boolean) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + buf.push(x as u8); + }, + vec![], + )) + } + (DataType::Boolean, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + buf.push(x as u8); + } + }, + vec![], + )) + } + (DataType::Utf8, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + } + }, + vec![], + )) + } + (DataType::Utf8, AvroSchema::String) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x.as_bytes()); + }, + vec![], + )) + } + (DataType::Binary, AvroSchema::Union(_)) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + } + }, + vec![], + )) + } + (DataType::Binary, AvroSchema::Bytes) => { + let values = array.as_any().downcast_ref::>().unwrap(); + Box::new(BufStreamingIterator::new( + values.values_iter(), + |x, buf| { + util::zigzag_encode(x.len() as i64, buf).unwrap(); + buf.extend_from_slice(x); + }, + vec![], + )) + } + + (DataType::Int32, AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x as i64, buf).unwrap(); + } + }, + vec![], + )) + } + (DataType::Int32, AvroSchema::Int) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x as i64, buf).unwrap(); + }, + vec![], + )) + } + (DataType::Int64, AvroSchema::Union(_)) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.iter(), + |x, buf| { + util::zigzag_encode(x.is_some() as i64, buf).unwrap(); + if let Some(x) = x { + util::zigzag_encode(*x, buf).unwrap(); + } + }, + vec![], + )) + } + (DataType::Int64, AvroSchema::Long) => { + let values = array + .as_any() + .downcast_ref::>() + .unwrap(); + Box::new(BufStreamingIterator::new( + values.values().iter(), + |x, buf| { + util::zigzag_encode(*x, buf).unwrap(); + }, + vec![], + )) + } + _ => todo!(), + } +} + +/// Whether [`new_serializer`] supports `data_type`. +pub fn can_serialize(data_type: &DataType) -> bool { + use DataType::*; + matches!(data_type, Boolean | Int32 | Int64 | Utf8 | Binary) +} diff --git a/src/io/avro/write/util.rs b/src/io/avro/write/util.rs new file mode 100644 index 00000000000..52d13c026e9 --- /dev/null +++ b/src/io/avro/write/util.rs @@ -0,0 +1,26 @@ +use crate::error::Result; + +#[inline] +pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<()> { + _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) +} + +#[inline] +fn _zigzag_encode(mut z: u64, writer: &mut W) -> Result<()> { + loop { + if z <= 0x7F { + writer.write_all(&[(z & 0x7F) as u8])?; + break; + } else { + writer.write_all(&[(0x80 | (z & 0x7F)) as u8])?; + z >>= 7; + } + } + Ok(()) +} + +pub(crate) fn write_binary(bytes: &[u8], writer: &mut W) -> Result<()> { + zigzag_encode(bytes.len() as i64, writer)?; + writer.write_all(bytes)?; + Ok(()) +} diff --git a/src/io/csv/write/mod.rs b/src/io/csv/write/mod.rs index e74bc150c16..5aa44c91d31 100644 --- a/src/io/csv/write/mod.rs +++ b/src/io/csv/write/mod.rs @@ -1,8 +1,7 @@ //! APIs to write to CSV -mod iterator; mod serialize; -use iterator::StreamingIterator; +use super::super::iterator::StreamingIterator; use std::io::Write; diff --git a/src/io/csv/write/serialize.rs b/src/io/csv/write/serialize.rs index 959c1f56b1b..b612869bdfa 100644 --- a/src/io/csv/write/serialize.rs +++ b/src/io/csv/write/serialize.rs @@ -10,7 +10,7 @@ use crate::{ error::Result, }; -use super::iterator::{BufStreamingIterator, StreamingIterator}; +use super::super::super::iterator::{BufStreamingIterator, StreamingIterator}; use crate::array::{DictionaryArray, DictionaryKey, Offset}; use std::any::Any; diff --git a/src/io/csv/write/iterator.rs b/src/io/iterator.rs similarity index 100% rename from src/io/csv/write/iterator.rs rename to src/io/iterator.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index a8d68d66951..e406de9847c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -42,3 +42,6 @@ pub mod avro; #[cfg(feature = "io_print")] #[cfg_attr(docsrs, doc(cfg(feature = "io_print")))] pub mod print; + +#[cfg(any(feature = "io_csv_write", feature = "io_avro"))] +mod iterator;