Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added Avro metadata write
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 19, 2021
1 parent 3685ae8 commit 81de974
Show file tree
Hide file tree
Showing 16 changed files with 550 additions and 31 deletions.
54 changes: 54 additions & 0 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{fs::File, sync::Arc};

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::write,
record_batch::RecordBatch,
};

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let avro_schema = write::to_avro_schema(&schema)?;

let mut file = File::create(path)?;

let compression = None;

write::write_metadata(&mut file, &avro_schema, compression)?;

let serializer = write::new_serializer(&array, avro_schema.fields()[0]);
let mut block = write::Block::new(array.len(), vec![]);

write::serialize(&mut vec![serializer], &mut block)?;

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}

write::write_block(&mut file, &compressed_block)?;

Ok(())
}
10 changes: 10 additions & 0 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;

use crate::error::ArrowError;

Expand All @@ -14,6 +15,15 @@ impl From<avro_rs::Error> for ArrowError {
}
}

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
Expand Down
9 changes: 1 addition & 8 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@ use crate::datatypes::Schema;
use crate::error::Result;
use crate::record_batch::RecordBatch;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
use super::Compression;

/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
Expand Down
43 changes: 25 additions & 18 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,38 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
props
}

/// Maps an Avro Schema into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
AvroSchema::Record { fields, .. } => {
for field in fields {
schema_fields.push(schema_to_field(
&field.schema,
Some(&field.name),
false,
Some(&external_props(&field.schema)),
)?)
}
let fields = match schema {
AvroSchema::Record { fields, .. } => fields,
other => {
return Err(ArrowError::OutOfSpec(format!(
"An avro Schema must be of type Record - it is of type {:?}",
other
)))
}
schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?),
}
};

let fields = fields
.iter()
.map(|field| {
schema_to_field(
&field.schema,
Some(&field.name),
false,
Some(external_props(&field.schema)),
)
})
.collect::<Result<Vec<_>>>()?;

let schema = Schema::new(schema_fields);
Ok(schema)
Ok(Schema::new(fields))
}

fn schema_to_field(
schema: &AvroSchema,
name: Option<&str>,
mut nullable: bool,
props: Option<&BTreeMap<String, String>>,
props: Option<BTreeMap<String, String>>,
) -> Result<Field> {
let data_type = match schema {
AvroSchema::Null => DataType::Null,
Expand Down Expand Up @@ -160,7 +167,7 @@ fn schema_to_field(
&field.schema,
Some(&format!("{}.{}", name.fullname(None), field.name)),
false,
Some(&props),
Some(props),
)
})
.collect();
Expand Down Expand Up @@ -189,6 +196,6 @@ fn schema_to_field(
let name = name.unwrap_or_default();

let mut field = Field::new(name, data_type, nullable);
field.set_metadata(props.cloned());
field.set_metadata(props);
Ok(field)
}
2 changes: 1 addition & 1 deletion src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

Expand Down
70 changes: 70 additions & 0 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};

use super::{util::zigzag_encode, SYNC_NUMBER};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Writes a [`CompressedBlock`] to `writer`
pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> {
// write size and rows
zigzag_encode(compressed_block.number_of_rows as i64, writer)?;
zigzag_encode(compressed_block.data.len() as i64, writer)?;

writer.write_all(&compressed_block.data)?;

writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
compressed_block.number_of_rows = block.number_of_rows;
Ok(())
}
30 changes: 30 additions & 0 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::collections::HashMap;

use avro_rs::Schema;
use serde_json;

use crate::error::{ArrowError, Result};

use super::Compression;

/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
let schema =
serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();

header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
let value = match compression {
Compression::Snappy => b"snappy".to_vec(),
Compression::Deflate => b"deflate".to_vec(),
};
header.insert("avro.codec".to_string(), value);
};

Ok(header)
}
66 changes: 66 additions & 0 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//! APIs to write to Avro format.
use std::io::Write;

use crate::error::Result;

use super::Compression;

mod header;
use header::serialize_header;
mod schema;
pub use schema::{to_avro_schema, AvroSchema};
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod util;

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
schema: &AvroSchema,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;

// * file metadata, including the schema.
let header = serialize_header(&schema.0, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// consumes a set of [`BoxSerializer`] into an [`Block`].
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
let Block {
data,
number_of_rows,
} = block;

data.clear(); // restart it

// _the_ transpose (columns -> rows)
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
}
}
Ok(())
}
Loading

0 comments on commit 81de974

Please sign in to comment.