Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flatbuffers to serialize dtypes #126

Merged
merged 9 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 7 additions & 11 deletions vortex-array/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{ErrorKind, Read, Write};
use arrow_buffer::buffer::{Buffer, MutableBuffer};

use vortex_schema::{
DType, DTypeReader, DTypeWriter, IntWidth, Nullability, SchemaError, Signedness,
DType, FbDeserialize, FbSerialize, IntWidth, Nullability, SchemaError, Signedness,
};

use crate::array::composite::find_extension_id;
Expand Down Expand Up @@ -82,12 +82,10 @@ impl<'a> ReadCtx<'a> {

#[inline]
pub fn dtype(&mut self) -> VortexResult<DType> {
DTypeReader::new(self.r)
.read(find_extension_id)
.map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
SchemaError::IOError(io_err) => io_err.0.into(),
})
let dtype_bytes = self.read_slice()?;
DType::deserialize(&dtype_bytes, find_extension_id).map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
})
}

pub fn ptype(&mut self) -> VortexResult<PType> {
Expand Down Expand Up @@ -182,10 +180,8 @@ impl<'a> WriteCtx<'a> {
}

pub fn dtype(&mut self, dtype: &DType) -> VortexResult<()> {
DTypeWriter::new(self.w).write(dtype).map_err(|e| match e {
SchemaError::InvalidArgument(s) => VortexError::InvalidArgument(s),
SchemaError::IOError(io_err) => io_err.0.into(),
})
let serialized = dtype.serialize();
self.write_slice(&serialized)
}

pub fn ptype(&mut self, ptype: PType) -> VortexResult<()> {
Expand Down
7 changes: 5 additions & 2 deletions vortex-schema/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ path = "src/lib.rs"
[dependencies]
arrow-schema = "50.0.0"
itertools = "0.12.1"
leb128 = "0.2.5"
num_enum = "0.7.2"
thiserror = "1.0.58"
flatbuffers = "23.5.26"

[build-dependencies]
flatc = "0.2.2"
walkdir = "2.4.0"

[lints]
workspace = true
50 changes: 50 additions & 0 deletions vortex-schema/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::env;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;

use flatc::flatc;
use walkdir::WalkDir;

fn main() {
let flatbuffers_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize CARGO_MANIFEST_DIR")
.join("flatbuffers");
let out_dir = PathBuf::from(env::var("OUT_DIR").unwrap())
.canonicalize()
.expect("Failed to canonicalize OUT_DIR");

let fbs_files = WalkDir::new(flatbuffers_dir)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().extension() == Some(OsStr::new("fbs")))
.map(|e| {
rerun_if_changed(e.path());
e.path().to_path_buf()
})
.collect::<Vec<_>>();

if !Command::new(flatc())
.args(["--filename-suffix", ""])
.arg("--rust")
.arg("-o")
.arg(out_dir.join("flatbuffers"))
.args(fbs_files)
.status()
.unwrap()
.success()
{
panic!("Failed to run flatc");
}
}

fn rerun_if_changed(path: &Path) {
println!(
"cargo:rerun-if-changed={}",
path.canonicalize()
.unwrap_or_else(|_| panic!("failed to canonicalize {}", path.to_str().unwrap()))
.to_str()
.unwrap()
);
}
94 changes: 94 additions & 0 deletions vortex-schema/flatbuffers/schema.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
enum Nullability: byte {
NonNullable,
Nullable,
}

enum Signedness: byte {
Signed,
Unsigned,
Unknown,
}

enum IntWidth: byte {
Unknown,
_8,
_16,
_32,
_64,
}

enum FloatWidth: byte {
Unknown,
_16,
_32,
_64,
}

table Null {}

table Bool {
nullability: Nullability;
}

table Int {
width: IntWidth = Unknown;
signedness: Signedness = Unknown;
nullability: Nullability;
}

table Decimal {
/// Total number of decimal digits
precision: ubyte;

/// Number of digits after the decimal point "."
scale: byte;
nullability: Nullability;
}

table Float {
width: FloatWidth = Unknown;
nullability: Nullability;
}

table Utf8 {
nullability: Nullability;

}

table Binary {
nullability: Nullability;
}

table Struct_ {
names: [string];
fields: [DType];
}

table List {
element_type: DType;
nullability: Nullability;
}

table Composite {
id: string;
nullability: Nullability;
}

union Type {
Null,
Bool,
Int,
Decimal,
Float,
Utf8,
Binary,
Struct_,
List,
Composite,
}

table DType {
type: Type;
}

root_type DType;
25 changes: 1 addition & 24 deletions vortex-schema/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::ops::Deref;
use std::{env, fmt, io};
use std::{env, fmt};

#[derive(Debug, PartialEq)]
pub struct ErrString(Cow<'static, str>);
Expand Down Expand Up @@ -43,29 +43,6 @@ impl Display for ErrString {
pub enum SchemaError {
#[error("{0}")]
InvalidArgument(ErrString),
#[error("io error: {0:?}")]
IOError(IOError),
}

pub type SchemaResult<T> = Result<T, SchemaError>;

macro_rules! wrapped_error {
($E:ty, $e:ident) => {
#[derive(Debug)]
pub struct $e(pub $E);

impl PartialEq for $e {
fn eq(&self, _other: &Self) -> bool {
false
}
}

impl From<$E> for SchemaError {
fn from(err: $E) -> Self {
SchemaError::$e($e(err))
}
}
};
}

wrapped_error!(io::Error, IOError);
16 changes: 13 additions & 3 deletions vortex-schema/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::fmt::{Display, Formatter};

pub use dtype::*;
pub use error::ErrString;
pub use error::SchemaError;
pub use error::SchemaResult;
pub use serde::DTypeReader;
pub use serde::DTypeWriter;
use std::fmt::{Display, Formatter};
pub use serde::FbDeserialize;
pub use serde::FbSerialize;

mod dtype;
mod error;
Expand All @@ -18,3 +19,12 @@ impl Display for CompositeID {
write!(f, "{}", self.0)
}
}

#[allow(unused_imports)]
#[allow(dead_code)]
#[allow(clippy::needless_lifetimes)]
#[allow(clippy::extra_unused_lifetimes)]
#[allow(non_camel_case_types)]
mod generated {
include!(concat!(env!("OUT_DIR"), "/flatbuffers/schema.rs"));
}
Loading