Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make serde optional and return VortexError #105

Merged
merged 1 commit into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions vortex-alp/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ impl Array for ALPArray {
self.encoded().nbytes() + self.patches().map(|p| p.nbytes()).unwrap_or(0)
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
17 changes: 7 additions & 10 deletions vortex-alp/src/serde.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use std::io;
use std::io::ErrorKind;

use crate::alp::Exponents;
use vortex::array::{Array, ArrayRef};
use vortex::dtype::{DType, FloatWidth, Signedness};
use vortex::error::{VortexError, VortexResult};
use vortex::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

use crate::ALPArray;
use crate::ALPEncoding;

impl ArraySerde for ALPArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.write_option_tag(self.patches().is_some())?;
if let Some(p) = self.patches() {
ctx.write(p.as_ref())?;
Expand All @@ -21,7 +19,7 @@ impl ArraySerde for ALPArray {
}

impl EncodingSerde for ALPEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let patches_tag = ctx.read_nbytes::<1>()?[0];
let patches = if patches_tag == 0x01 {
Some(ctx.read()?)
Expand All @@ -33,9 +31,9 @@ impl EncodingSerde for ALPEncoding {
DType::Float(width, nullability) => match width {
FloatWidth::_32 => DType::Int(32.into(), Signedness::Signed, *nullability),
FloatWidth::_64 => DType::Int(64.into(), Signedness::Signed, *nullability),
_ => return Err(io::Error::new(ErrorKind::InvalidData, "invalid dtype")),
_ => return Err(VortexError::InvalidDType(ctx.schema().clone())),
},
_ => return Err(io::Error::new(ErrorKind::InvalidData, "invalid dtype")),
_ => return Err(VortexError::InvalidDType(ctx.schema().clone())),
};
let encoded = ctx.with_schema(&encoded_dtype).read()?;
Ok(ALPArray::new(
Expand All @@ -52,17 +50,16 @@ impl EncodingSerde for ALPEncoding {

#[cfg(test)]
mod test {
use std::io;

use vortex::array::downcast::DowncastArrayBuiltin;
use vortex::array::primitive::PrimitiveArray;
use vortex::array::{Array, ArrayRef};
use vortex::error::VortexResult;
use vortex::serde::{ReadCtx, WriteCtx};

use crate::compress::alp_encode;
use crate::downcast::DowncastALP;

fn roundtrip_array(array: &dyn Array) -> io::Result<ArrayRef> {
fn roundtrip_array(array: &dyn Array) -> VortexResult<ArrayRef> {
let mut buf = Vec::<u8>::new();
let mut write_ctx = WriteCtx::new(&mut buf);
write_ctx.write(array)?;
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/bool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ impl Array for BoolArray {
(self.len() + 7) / 8
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/bool/serde.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::io;

use arrow_buffer::buffer::BooleanBuffer;

use crate::array::bool::{BoolArray, BoolEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for BoolArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
if let Some(v) = self.validity() {
ctx.write(v.as_ref())?;
}
Expand All @@ -16,7 +15,7 @@ impl ArraySerde for BoolArray {
}

impl EncodingSerde for BoolEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let validity = if ctx.schema().is_nullable() {
Some(ctx.validity().read()?)
} else {
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ impl Array for ChunkedArray {
self.chunks().iter().map(|arr| arr.nbytes()).sum()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is to check whether an array supports serde... should chunked array check that every chunk serde is Some(_)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(can be a FLUP)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, the implementation of serde can subsequently fail. This is just a way to maybe get an implementation at runtime

}
}

Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/chunked/serde.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::io;

use crate::array::chunked::{ChunkedArray, ChunkedEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for ChunkedArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.write_usize(self.chunks().len())?;
for c in self.chunks() {
ctx.write(c.as_ref())?;
Expand All @@ -15,7 +14,7 @@ impl ArraySerde for ChunkedArray {
}

impl EncodingSerde for ChunkedEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let chunk_len = ctx.read_usize()?;
let mut chunks = Vec::<ArrayRef>::with_capacity(chunk_len);
// TODO(robert): Use read_vectored
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/constant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl Array for ConstantArray {
self.scalar.nbytes()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/constant/serde.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::io;

use crate::array::constant::{ConstantArray, ConstantEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for ConstantArray {
fn write(&self, ctx: &mut WriteCtx<'_>) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx<'_>) -> VortexResult<()> {
ctx.write_usize(self.len())?;
ctx.scalar(self.scalar())
}
}

impl EncodingSerde for ConstantEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let len = ctx.read_usize()?;
let scalar = ctx.scalar()?;
Ok(ConstantArray::new(scalar, len).boxed())
Expand Down
4 changes: 3 additions & 1 deletion vortex-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ pub trait Array:
/// Approximate size in bytes of the array. Only takes into account variable size portion of the array
fn nbytes(&self) -> usize;

fn serde(&self) -> &dyn ArraySerde;
fn serde(&self) -> Option<&dyn ArraySerde> {
None
}
}

dyn_clone::clone_trait_object!(Array);
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ impl Array for PrimitiveArray {
self.buffer.len()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/primitive/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::io::ErrorKind;

use crate::array::primitive::{PrimitiveArray, PrimitiveEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::ptype::PType;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for PrimitiveArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
if let Some(v) = self.validity() {
ctx.write(v.as_ref())?;
}
Expand All @@ -16,7 +17,7 @@ impl ArraySerde for PrimitiveArray {
}

impl EncodingSerde for PrimitiveEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let validity = if ctx.schema().is_nullable() {
Some(ctx.validity().read()?)
} else {
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/sparse/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,8 @@ impl Array for SparseArray {
self.indices.nbytes() + self.values.nbytes()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
5 changes: 3 additions & 2 deletions vortex-array/src/array/sparse/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::io::ErrorKind;
use crate::array::sparse::{SparseArray, SparseEncoding};
use crate::array::{Array, ArrayRef};
use crate::dtype::DType;
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for SparseArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.write_usize(self.len())?;
// TODO(robert): Rewrite indices and don't store offset
ctx.write_usize(self.indices_offset())?;
Expand All @@ -17,7 +18,7 @@ impl ArraySerde for SparseArray {
}

impl EncodingSerde for SparseEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let len = ctx.read_usize()?;
let offset = ctx.read_usize()?;
let indices = ctx.with_schema(&DType::IDX).read()?;
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ impl Array for StructArray {
self.fields.iter().map(|arr| arr.nbytes()).sum()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
13 changes: 4 additions & 9 deletions vortex-array/src/array/struct_/serde.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::io;
use std::io::ErrorKind;

use crate::array::struct_::{StructArray, StructEncoding};
use crate::array::{Array, ArrayRef};
use crate::dtype::DType;
use crate::error::{VortexError, VortexResult};
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for StructArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.write_usize(self.fields().len())?;
for f in self.fields() {
ctx.write(f.as_ref())?;
Expand All @@ -17,18 +15,15 @@ impl ArraySerde for StructArray {
}

impl EncodingSerde for StructEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let num_fields = ctx.read_usize()?;
let mut fields = Vec::<ArrayRef>::with_capacity(num_fields);
// TODO(robert): use read_vectored
for i in 0..num_fields {
fields.push(ctx.subfield(i).read()?);
}
let DType::Struct(ns, _) = ctx.schema() else {
return Err(io::Error::new(
ErrorKind::InvalidData,
"invalid schema type",
));
return Err(VortexError::InvalidDType(ctx.schema().clone()));
};
Ok(StructArray::new(ns.clone(), fields).boxed())
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/typed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl Array for TypedArray {
self.array.nbytes()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/typed/serde.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use std::io;

use crate::array::typed::{TypedArray, TypedEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for TypedArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
ctx.dtype(self.untyped_array().dtype())?;
ctx.write(self.untyped_array())
}
}

impl EncodingSerde for TypedEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let inner_dtype = ctx.dtype()?;
Ok(TypedArray::new(ctx.with_schema(&inner_dtype).read()?, ctx.schema().clone()).boxed())
}
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/varbin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ impl Array for VarBinArray {
self.bytes.nbytes() + self.offsets.nbytes()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
7 changes: 3 additions & 4 deletions vortex-array/src/array/varbin/serde.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::io;

use crate::array::varbin::{VarBinArray, VarBinEncoding};
use crate::array::{Array, ArrayRef};
use crate::error::VortexResult;
use crate::serde::{ArraySerde, EncodingSerde, ReadCtx, WriteCtx};

impl ArraySerde for VarBinArray {
fn write(&self, ctx: &mut WriteCtx) -> io::Result<()> {
fn write(&self, ctx: &mut WriteCtx) -> VortexResult<()> {
if let Some(v) = self.validity() {
ctx.write(v.as_ref())?;
}
Expand All @@ -16,7 +15,7 @@ impl ArraySerde for VarBinArray {
}

impl EncodingSerde for VarBinEncoding {
fn read(&self, ctx: &mut ReadCtx) -> io::Result<ArrayRef> {
fn read(&self, ctx: &mut ReadCtx) -> VortexResult<ArrayRef> {
let validity = if ctx.schema().is_nullable() {
Some(ctx.validity().read()?)
} else {
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/array/varbinview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ impl Array for VarBinViewArray {
self.views.nbytes() + self.data.iter().map(|arr| arr.nbytes()).sum::<usize>()
}

fn serde(&self) -> &dyn ArraySerde {
self
fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

Expand Down
Loading