Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added Avro metadata write
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 20, 2021
1 parent dc1784d commit 00988b2
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 23 deletions.
54 changes: 54 additions & 0 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{fs::File, sync::Arc};

use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::write,
record_batch::RecordBatch,
};

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let path = &args[1];

let array = Int32Array::from(&[
Some(0),
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]);
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let avro_schema = write::to_avro_schema(&schema)?;

let mut file = File::create(path)?;

let compression = None;

write::write_metadata(&mut file, &avro_schema, compression)?;

let serializer = write::new_serializer(&array, avro_schema.fields()[0]);
let mut block = write::Block::new(array.len(), vec![]);

write::serialize(&mut vec![serializer], &mut block)?;

let mut compressed_block = write::CompressedBlock::default();

if let Some(compression) = compression {
write::compress(&block, &mut compressed_block, compression)?;
} else {
compressed_block.number_of_rows = block.number_of_rows;
std::mem::swap(&mut compressed_block.data, &mut block.data);
}

write::write_block(&mut file, &compressed_block)?;

Ok(())
}
11 changes: 10 additions & 1 deletion src/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@ pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
{
let mut i = 0u64;
let mut buf = [0u8; 1];

let mut j = 0;
loop {
if j > 9 {
Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
Ok((rows as usize, bytes as usize))
}

/// Reads a block from the file into `buf`.
/// Reads a block from the `reader` into `buf`.
/// # Panic
/// Panics iff the block marker does not equal to the file's marker
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
Expand Down
9 changes: 1 addition & 8 deletions src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,7 @@ use crate::datatypes::Schema;
use crate::error::Result;
use crate::record_batch::RecordBatch;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
}
use super::Compression;

/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker.
#[allow(clippy::type_complexity)]
Expand Down
22 changes: 13 additions & 9 deletions src/io/avro/read/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ fn external_props(schema: &AvroSchema) -> BTreeMap<String, String> {
props
}

/// Maps an Avro Schema into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
Expand All @@ -65,22 +66,25 @@ pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
&field.schema,
Some(&field.name),
false,
Some(&external_props(&field.schema)),
Some(external_props(&field.schema)),
)?)
}
}
schema => schema_fields.push(schema_to_field(schema, Some(""), false, None)?),
}

let schema = Schema::new(schema_fields);
Ok(schema)
other => {
return Err(ArrowError::OutOfSpec(format!(
"An avro Schema must be of type Record - it is of type {:?}",
other
)))
}
};
Ok(Schema::new(schema_fields))
}

fn schema_to_field(
schema: &AvroSchema,
name: Option<&str>,
mut nullable: bool,
props: Option<&BTreeMap<String, String>>,
props: Option<BTreeMap<String, String>>,
) -> Result<Field> {
let data_type = match schema {
AvroSchema::Null => DataType::Null,
Expand Down Expand Up @@ -169,7 +173,7 @@ fn schema_to_field(
&field.schema,
Some(&format!("{}.{}", name, field.name)),
false,
Some(&props),
Some(props),
)
})
.collect();
Expand Down Expand Up @@ -198,6 +202,6 @@ fn schema_to_field(
let name = name.unwrap_or_default();

let mut field = Field::new(name, data_type, nullable);
field.set_metadata(props.cloned());
field.set_metadata(props);
Ok(field)
}
2 changes: 1 addition & 1 deletion src/io/avro/read_async/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::error::{ArrowError, Result};

use super::super::read::convert_schema;
use super::super::read::deserialize_header;
use super::super::read::Compression;
use super::super::Compression;
use super::super::{read_header, read_metadata};
use super::utils::zigzag_i64;

Expand Down
70 changes: 70 additions & 0 deletions src/io/avro/write/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::io::Write;

use crate::{error::Result, io::avro::Compression};

use super::{util::zigzag_encode, SYNC_NUMBER};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// Writes a [`CompressedBlock`] to `writer`
pub fn write_block<W: Write>(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> {
// write size and rows
zigzag_encode(compressed_block.number_of_rows as i64, writer)?;
zigzag_encode(compressed_block.data.len() as i64, writer)?;

writer.write_all(&compressed_block.data)?;

writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// Compresses an [`Block`] to a [`CompressedBlock`].
pub fn compress(
block: &Block,
compressed_block: &mut CompressedBlock,
compression: Compression,
) -> Result<()> {
match compression {
Compression::Deflate => todo!(),
Compression::Snappy => todo!(),
}
compressed_block.number_of_rows = block.number_of_rows;
Ok(())
}
30 changes: 30 additions & 0 deletions src/io/avro/write/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use std::collections::HashMap;

use avro_schema::Schema;
use serde_json;

use crate::error::{ArrowError, Result};

use super::Compression;

/// Serializes an [`Schema`] and optional [`Compression`] into an avro header.
pub(crate) fn serialize_header(
schema: &Schema,
compression: Option<Compression>,
) -> Result<HashMap<String, Vec<u8>>> {
let schema =
serde_json::to_string(schema).map_err(|e| ArrowError::ExternalFormat(e.to_string()))?;

let mut header = HashMap::<String, Vec<u8>>::default();

header.insert("avro.schema".to_string(), schema.into_bytes());
if let Some(compression) = compression {
let value = match compression {
Compression::Snappy => b"snappy".to_vec(),
Compression::Deflate => b"deflate".to_vec(),
};
header.insert("avro.codec".to_string(), value);
};

Ok(header)
}
68 changes: 68 additions & 0 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::Schema as AvroSchema;

use crate::error::Result;

use super::Compression;

mod header;
use header::serialize_header;
mod schema;
pub use schema::to_avro_schema;
mod serialize;
pub use serialize::{can_serialize, new_serializer, BoxSerializer};
mod block;
pub use block::*;
mod util;

const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];

/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
schema: &AvroSchema,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;

// * file metadata, including the schema.
let header = serialize_header(schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
util::write_binary(name.as_bytes(), writer)?;
util::write_binary(&item, writer)?;
}
writer.write_all(&[0])?;

// The 16-byte, randomly-generated sync marker for this file.
writer.write_all(&SYNC_NUMBER)?;

Ok(())
}

/// consumes a set of [`BoxSerializer`] into an [`Block`].
/// # Panics
/// Panics iff the number of items in any of the serializers is not equal to the number of rows
/// declared in the `block`.
pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) -> Result<()> {
let Block {
data,
number_of_rows,
} = block;

data.clear(); // restart it

// _the_ transpose (columns -> rows)
for _ in 0..*number_of_rows {
for serializer in &mut *serializers {
let item_data = serializer.next().unwrap();
data.write_all(item_data)?;
}
}
Ok(())
}
55 changes: 55 additions & 0 deletions src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use avro_schema::{
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record, Schema as AvroSchema,
};

use crate::datatypes::*;
use crate::error::{ArrowError, Result};

/// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it.
pub fn to_avro_schema(schema: &Schema) -> Result<AvroSchema> {
let fields = schema
.fields
.iter()
.map(|field| field_to_field(field))
.collect::<Result<Vec<_>>>()?;
Ok(avro_schema::Schema::Record(Record::new("", fields)))
}

fn field_to_field(field: &Field) -> Result<AvroField> {
let schema = type_to_schema(field.data_type())?;
Ok(AvroField::new(field.name(), schema))
}

fn type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
Ok(match data_type.to_logical_type() {
DataType::Null => AvroSchema::Null,
DataType::Boolean => AvroSchema::Int(None),
DataType::Int64 => AvroSchema::Long(None),
DataType::Float32 => AvroSchema::Float,
DataType::Float64 => AvroSchema::Double,
DataType::Binary => AvroSchema::Bytes(None),
DataType::Utf8 => AvroSchema::String(None),
DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(inner.data_type())?)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),
DataType::Timestamp(TimeUnit::Millisecond, None) => {
AvroSchema::Long(Some(LongLogical::LocalTimestampMillis))
}
DataType::Timestamp(TimeUnit::Microsecond, None) => {
AvroSchema::Long(Some(LongLogical::LocalTimestampMicros))
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
let mut fixed = Fixed::new("", 12);
fixed.logical = Some(FixedLogical::Duration);
AvroSchema::Fixed(fixed)
}
DataType::FixedSizeBinary(size) => AvroSchema::Fixed(Fixed::new("", *size)),
other => {
return Err(ArrowError::NotYetImplemented(format!(
"write {:?} to avro",
other
)))
}
})
}
Loading

0 comments on commit 00988b2

Please sign in to comment.