Skip to content

Commit

Permalink
Support Parsing Avro File Headers (apache#4888)
Browse files Browse the repository at this point in the history
* Add arrow-avro

* Add HeaderDecoder

* Add schema parsing

* Add BlockDecoder

* Further docs

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Review feedback

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Oct 4, 2023
1 parent 4320a75 commit f0455d1
Show file tree
Hide file tree
Showing 15 changed files with 1,169 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/arrow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ on:
- arrow-integration-test/**
- arrow-ipc/**
- arrow-json/**
- arrow-avro/**
- arrow-ord/**
- arrow-row/**
- arrow-schema/**
Expand Down Expand Up @@ -78,6 +79,8 @@ jobs:
run: cargo test -p arrow-csv --all-features
- name: Test arrow-json with all features
run: cargo test -p arrow-json --all-features
- name: Test arrow-avro with all features
run: cargo test -p arrow-avro --all-features
- name: Test arrow-string with all features
run: cargo test -p arrow-string --all-features
- name: Test arrow-ord with all features
Expand Down Expand Up @@ -202,6 +205,8 @@ jobs:
run: cargo clippy -p arrow-csv --all-targets --all-features -- -D warnings
- name: Clippy arrow-json with all features
run: cargo clippy -p arrow-json --all-targets --all-features -- -D warnings
- name: Clippy arrow-avro with all features
run: cargo clippy -p arrow-avro --all-targets --all-features -- -D warnings
- name: Clippy arrow-string with all features
run: cargo clippy -p arrow-string --all-targets --all-features -- -D warnings
- name: Clippy arrow-ord with all features
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/dev_pr/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ arrow:
- arrow-integration-testing/**/*
- arrow-ipc/**/*
- arrow-json/**/*
- arrow-avro/**/*
- arrow-ord/**/*
- arrow-row/**/*
- arrow-schema/**/*
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ on:
- arrow-integration-testing/**
- arrow-ipc/**
- arrow-json/**
- arrow-avro/**
- arrow-ord/**
- arrow-pyarrow-integration-testing/**
- arrow-schema/**
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/miri.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ on:
- arrow-data/**
- arrow-ipc/**
- arrow-json/**
- arrow-avro/**
- arrow-schema/**
- arrow-select/**
- arrow-string/**
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/parquet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ on:
- arrow-ipc/**
- arrow-csv/**
- arrow-json/**
- arrow-avro/**
- parquet/**
- .github/**

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ members = [
"arrow",
"arrow-arith",
"arrow-array",
"arrow-avro",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
Expand Down
46 changes: 46 additions & 0 deletions arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "arrow-avro"
version = { workspace = true }
description = "Support for parsing Avro format into the Arrow format"
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
license = { workspace = true }
keywords = { workspace = true }
include = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }

[lib]
name = "arrow_avro"
path = "src/lib.rs"
bench = false

[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }

[dev-dependencies]

32 changes: 32 additions & 0 deletions arrow-avro/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use serde::{Deserialize, Serialize};

/// The metadata key used for storing the JSON encoded [`CompressionCodec`]
pub const CODEC_METADATA_KEY: &str = "avro.codec";

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionCodec {
Null,
Deflate,
BZip2,
Snappy,
XZ,
ZStandard,
}
28 changes: 28 additions & 0 deletions arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Convert data to / from the [Apache Arrow] memory format and [Apache Avro]
//!
//! [Apache Arrow]: https://arrow.apache.org
//! [Apache Avro]: https://avro.apache.org/
#![allow(unused)] // Temporary

pub mod reader;
mod schema;

mod compression;
141 changes: 141 additions & 0 deletions arrow-avro/src/reader/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Decoder for [`Block`]
use crate::reader::vlq::VLQDecoder;
use arrow_schema::ArrowError;

/// A file data block
///
/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
#[derive(Debug, Default)]
pub struct Block {
/// The number of objects in this block
pub count: usize,
/// The serialized objects within this block
pub data: Vec<u8>,
/// The sync marker
pub sync: [u8; 16],
}

/// A decoder for [`Block`]
#[derive(Debug)]
pub struct BlockDecoder {
state: BlockDecoderState,
in_progress: Block,
vlq_decoder: VLQDecoder,
bytes_remaining: usize,
}

#[derive(Debug)]
enum BlockDecoderState {
Count,
Size,
Data,
Sync,
Finished,
}

impl Default for BlockDecoder {
fn default() -> Self {
Self {
state: BlockDecoderState::Count,
in_progress: Default::default(),
vlq_decoder: Default::default(),
bytes_remaining: 0,
}
}
}

impl BlockDecoder {
/// Parse [`Block`] from `buf`, returning the number of bytes read
///
/// This method can be called multiple times with consecutive chunks of data, allowing
/// integration with chunked IO systems like [`BufRead::fill_buf`]
///
/// All errors should be considered fatal, and decoding aborted
///
/// Once an entire [`Block`] has been decoded this method will not read any further
/// input bytes, until [`Self::flush`] is called. Afterwards [`Self::decode`]
/// can then be used again to read the next block, if any
///
/// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
let max_read = buf.len();
while !buf.is_empty() {
match self.state {
BlockDecoderState::Count => {
if let Some(c) = self.vlq_decoder.long(&mut buf) {
self.in_progress.count = c.try_into().map_err(|_| {
ArrowError::ParseError(format!(
"Block count cannot be negative, got {c}"
))
})?;

self.state = BlockDecoderState::Size;
}
}
BlockDecoderState::Size => {
if let Some(c) = self.vlq_decoder.long(&mut buf) {
self.bytes_remaining = c.try_into().map_err(|_| {
ArrowError::ParseError(format!(
"Block size cannot be negative, got {c}"
))
})?;

self.in_progress.data.reserve(self.bytes_remaining);
self.state = BlockDecoderState::Data;
}
}
BlockDecoderState::Data => {
let to_read = self.bytes_remaining.min(buf.len());
self.in_progress.data.extend_from_slice(&buf[..to_read]);
buf = &buf[to_read..];
self.bytes_remaining -= to_read;
if self.bytes_remaining == 0 {
self.bytes_remaining = 16;
self.state = BlockDecoderState::Sync;
}
}
BlockDecoderState::Sync => {
let to_decode = buf.len().min(self.bytes_remaining);
let write = &mut self.in_progress.sync[16 - to_decode..];
write[..to_decode].copy_from_slice(&buf[..to_decode]);
self.bytes_remaining -= to_decode;
buf = &buf[to_decode..];
if self.bytes_remaining == 0 {
self.state = BlockDecoderState::Finished;
}
}
BlockDecoderState::Finished => return Ok(max_read - buf.len()),
}
}
Ok(max_read)
}

/// Flush this decoder returning the parsed [`Block`] if any
pub fn flush(&mut self) -> Option<Block> {
match self.state {
BlockDecoderState::Finished => {
self.state = BlockDecoderState::Count;
Some(std::mem::take(&mut self.in_progress))
}
_ => None,
}
}
}
Loading

0 comments on commit f0455d1

Please sign in to comment.