Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Finished first iteration of avro writer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 20, 2021
1 parent 00988b2 commit 990099a
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 69 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]
io_avro = ["avro-schema", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
Expand Down
9 changes: 4 additions & 5 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use arrow2::{
datatypes::{Field, Schema},
error::Result,
io::avro::write,
record_batch::RecordBatch,
};

fn main() -> Result<()> {
Expand All @@ -26,18 +25,18 @@ fn main() -> Result<()> {
let field = Field::new("c1", array.data_type().clone(), true);
let schema = Schema::new(vec![field]);

let avro_schema = write::to_avro_schema(&schema)?;
let avro_fields = write::to_avro_schema(&schema)?;

let mut file = File::create(path)?;

let compression = None;

write::write_metadata(&mut file, &avro_schema, compression)?;
write::write_metadata(&mut file, avro_fields.clone(), compression)?;

let serializer = write::new_serializer(&array, avro_schema.fields()[0]);
let serializer = write::new_serializer(&array, &avro_fields[0].schema);
let mut block = write::Block::new(array.len(), vec![]);

write::serialize(&mut vec![serializer], &mut block)?;
write::serialize(&mut [serializer], &mut block)?;

let mut compressed_block = write::CompressedBlock::default();

Expand Down
2 changes: 1 addition & 1 deletion src/io/avro/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = schema::convert_schema(&avro_schema)?;
let schema = convert_schema(&avro_schema)?;

let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
Expand Down
9 changes: 5 additions & 4 deletions src/io/avro/write/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
//! APIs to write to Avro format.
use std::io::Write;

use avro_schema::Schema as AvroSchema;
use avro_schema::{Field as AvroField, Record, Schema as AvroSchema};

use crate::error::Result;

use super::Compression;
pub use super::Compression;

mod header;
use header::serialize_header;
Expand All @@ -22,15 +22,16 @@ const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4];
/// Writes Avro's metadata to `writer`.
pub fn write_metadata<W: std::io::Write>(
writer: &mut W,
schema: &AvroSchema,
fields: Vec<AvroField>,
compression: Option<Compression>,
) -> Result<()> {
// * Four bytes, ASCII 'O', 'b', 'j', followed by 1.
let avro_magic = [b'O', b'b', b'j', 1u8];
writer.write_all(&avro_magic)?;

// * file metadata, including the schema.
let header = serialize_header(schema, compression)?;
let schema = AvroSchema::Record(Record::new("", fields));
let header = serialize_header(&schema, compression)?;

util::zigzag_encode(header.len() as i64, writer)?;
for (name, item) in header {
Expand Down
29 changes: 20 additions & 9 deletions src/io/avro/write/schema.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,46 @@
use avro_schema::{
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record, Schema as AvroSchema,
Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Schema as AvroSchema,
};

use crate::datatypes::*;
use crate::error::{ArrowError, Result};

/// Converts a [`Schema`] to an avro [`AvroSchema::Record`] with it.
pub fn to_avro_schema(schema: &Schema) -> Result<AvroSchema> {
let fields = schema
pub fn to_avro_schema(schema: &Schema) -> Result<Vec<AvroField>> {
schema
.fields
.iter()
.map(|field| field_to_field(field))
.collect::<Result<Vec<_>>>()?;
Ok(avro_schema::Schema::Record(Record::new("", fields)))
.collect()
}

fn field_to_field(field: &Field) -> Result<AvroField> {
let schema = type_to_schema(field.data_type())?;
let schema = type_to_schema(field.data_type(), field.is_nullable())?;
Ok(AvroField::new(field.name(), schema))
}

fn type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
fn type_to_schema(data_type: &DataType, is_nullable: bool) -> Result<AvroSchema> {
Ok(if is_nullable {
AvroSchema::Union(vec![AvroSchema::Null, _type_to_schema(data_type)?])
} else {
_type_to_schema(data_type)?
})
}

fn _type_to_schema(data_type: &DataType) -> Result<AvroSchema> {
Ok(match data_type.to_logical_type() {
DataType::Null => AvroSchema::Null,
DataType::Boolean => AvroSchema::Int(None),
DataType::Boolean => AvroSchema::Boolean,
DataType::Int32 => AvroSchema::Int(None),
DataType::Int64 => AvroSchema::Long(None),
DataType::Float32 => AvroSchema::Float,
DataType::Float64 => AvroSchema::Double,
DataType::Binary => AvroSchema::Bytes(None),
DataType::Utf8 => AvroSchema::String(None),
DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(inner.data_type())?)),
DataType::List(inner) => AvroSchema::Array(Box::new(type_to_schema(
inner.data_type(),
inner.is_nullable(),
)?)),
DataType::Date32 => AvroSchema::Int(Some(IntLogical::Date)),
DataType::Time32(TimeUnit::Millisecond) => AvroSchema::Int(Some(IntLogical::Time)),
DataType::Time64(TimeUnit::Microsecond) => AvroSchema::Long(Some(LongLogical::Time)),
Expand Down
49 changes: 38 additions & 11 deletions src/io/avro/write/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use avro_schema::Schema as AvroSchema;

use crate::datatypes::{PhysicalType, PrimitiveType};
use crate::{array::*, datatypes::DataType};

use super::super::super::iterator::*;
Expand All @@ -17,10 +18,10 @@ pub type BoxSerializer<'a> = Box<dyn StreamingIterator<Item = [u8]> + 'a + Send
/// This function performs minimal CPU work: it dynamically dispatches based on the schema
/// and arrow type.
pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSerializer<'a> {
let data_type = array.data_type().to_logical_type();
let data_type = array.data_type().to_physical_type();

match (data_type, schema) {
(DataType::Boolean, AvroSchema::Boolean) => {
(PhysicalType::Boolean, AvroSchema::Boolean) => {
let values = array.as_any().downcast_ref::<BooleanArray>().unwrap();
Box::new(BufStreamingIterator::new(
values.values_iter(),
Expand All @@ -30,7 +31,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Boolean, AvroSchema::Union(_)) => {
(PhysicalType::Boolean, AvroSchema::Union(_)) => {
let values = array.as_any().downcast_ref::<BooleanArray>().unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
Expand All @@ -43,7 +44,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Utf8, AvroSchema::Union(_)) => {
(PhysicalType::Utf8, AvroSchema::Union(_)) => {
let values = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
Expand All @@ -57,7 +58,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Utf8, AvroSchema::String(_)) => {
(PhysicalType::Utf8, AvroSchema::String(_)) => {
let values = array.as_any().downcast_ref::<Utf8Array<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.values_iter(),
Expand All @@ -68,7 +69,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Binary, AvroSchema::Union(_)) => {
(PhysicalType::Binary, AvroSchema::Union(_)) => {
let values = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
Expand All @@ -82,7 +83,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Binary, AvroSchema::Bytes(_)) => {
(PhysicalType::Binary, AvroSchema::Bytes(_)) => {
let values = array.as_any().downcast_ref::<BinaryArray<i32>>().unwrap();
Box::new(BufStreamingIterator::new(
values.values_iter(),
Expand All @@ -94,7 +95,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
))
}

(DataType::Int32, AvroSchema::Union(_)) => {
(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i32>>()
Expand All @@ -110,7 +111,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Int32, AvroSchema::Int(_)) => {
(PhysicalType::Primitive(PrimitiveType::Int32), AvroSchema::Int(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i32>>()
Expand All @@ -123,7 +124,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Int64, AvroSchema::Union(_)) => {
(PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
Expand All @@ -139,7 +140,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(DataType::Int64, AvroSchema::Long(_)) => {
(PhysicalType::Primitive(PrimitiveType::Int64), AvroSchema::Long(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<i64>>()
Expand All @@ -152,6 +153,32 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float32), AvroSchema::Float) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<f32>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
buf.extend_from_slice(&x.to_le_bytes());
},
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::Float64), AvroSchema::Double) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<f64>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
|x, buf| {
buf.extend_from_slice(&x.to_le_bytes());
},
vec![],
))
}
_ => todo!(),
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/avro/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
mod read;
#[cfg(feature = "io_avro_async")]
mod read_async;
mod write;
78 changes: 43 additions & 35 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use arrow2::error::Result;
use arrow2::io::avro::read;
use arrow2::record_batch::RecordBatch;

fn schema() -> (AvroSchema, Schema) {
pub(super) fn schema() -> (AvroSchema, Schema) {
let raw_schema = r#"
{
"type": "record",
Expand Down Expand Up @@ -69,7 +69,35 @@ fn schema() -> (AvroSchema, Schema) {
(AvroSchema::parse_str(raw_schema).unwrap(), schema)
}

pub(super) fn write(codec: Codec) -> std::result::Result<(Vec<u8>, RecordBatch), avro_rs::Error> {
pub(super) fn data() -> RecordBatch {
let data = vec![
Some(vec![Some(1i32), None, Some(3)]),
Some(vec![Some(1i32), None, Some(3)]),
];

let mut array = MutableListArray::<i32, MutablePrimitiveArray<i32>>::new();
array.try_extend(data).unwrap();

let columns = vec![
Arc::new(Int64Array::from_slice([27, 47])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from_slice(["foo", "bar"])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 1])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc<dyn Array>,
Arc::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])) as Arc<dyn Array>,
Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])) as Arc<dyn Array>,
Arc::new(BooleanArray::from_slice([true, false])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])) as Arc<dyn Array>,
array.into_arc(),
Arc::new(DictionaryArray::<i32>::from_data(
Int32Array::from_slice([1, 0]),
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
)) as Arc<dyn Array>,
];

RecordBatch::try_new(Arc::new(schema().1), columns).unwrap()
}

pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::Error> {
let (avro, schema) = schema();
// a writer needs a schema and something to write to
let mut writer = Writer::with_codec(&avro, Vec::new(), codec);
Expand Down Expand Up @@ -118,40 +146,11 @@ pub(super) fn write(codec: Codec) -> std::result::Result<(Vec<u8>, RecordBatch),
);
record.put("enum", Value::Enum(0, "SPADES".to_string()));
writer.append(record)?;

let data = vec![
Some(vec![Some(1i32), None, Some(3)]),
Some(vec![Some(1i32), None, Some(3)]),
];

let mut array = MutableListArray::<i32, MutablePrimitiveArray<i32>>::new();
array.try_extend(data).unwrap();

let columns = vec![
Arc::new(Int64Array::from_slice([27, 47])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from_slice(["foo", "bar"])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 1])) as Arc<dyn Array>,
Arc::new(Int32Array::from_slice([1, 2]).to(DataType::Date32)) as Arc<dyn Array>,
Arc::new(BinaryArray::<i32>::from_slice([b"foo", b"bar"])) as Arc<dyn Array>,
Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])) as Arc<dyn Array>,
Arc::new(BooleanArray::from_slice([true, false])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])) as Arc<dyn Array>,
array.into_arc(),
Arc::new(DictionaryArray::<i32>::from_data(
Int32Array::from_slice([1, 0]),
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
)) as Arc<dyn Array>,
];

let expected = RecordBatch::try_new(Arc::new(schema), columns).unwrap();

Ok((writer.into_inner().unwrap(), expected))
Ok(writer.into_inner().unwrap())
}

fn test(codec: Codec) -> Result<()> {
let (data, expected) = write(codec).unwrap();

let file = &mut &data[..];
pub(super) fn read_avro(mut avro: &[u8]) -> Result<RecordBatch> {
let file = &mut avro;

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;

Expand All @@ -161,7 +160,16 @@ fn test(codec: Codec) -> Result<()> {
Arc::new(schema),
);

assert_eq!(reader.next().unwrap().unwrap(), expected);
reader.next().unwrap()
}

fn test(codec: Codec) -> Result<()> {
let avro = write_avro(codec).unwrap();
let expected = data();

let result = read_avro(&avro)?;

assert_eq!(result, expected);
Ok(())
}

Expand Down
Loading

0 comments on commit 990099a

Please sign in to comment.