Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composite Arrays #122

Merged
merged 15 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ pub fn download_taxi_data() -> PathBuf {
pub fn compress_taxi_data() -> ArrayRef {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let _mask = ProjectionMask::roots(builder.parquet_schema(), [6]);
let _mask = ProjectionMask::roots(builder.parquet_schema(), [1]);
let _no_datetime_mask = ProjectionMask::roots(
builder.parquet_schema(),
[0, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18],
);
let reader = builder
//.with_projection(mask)
.with_projection(_mask)
//.with_projection(no_datetime_mask)
.with_batch_size(65_536)
// .with_batch_size(5_000_000)
Expand Down Expand Up @@ -152,6 +152,7 @@ mod test {
use vortex::array::ArrayRef;
use vortex::compute::as_arrow::as_arrow;
use vortex::encode::FromArrow;
use vortex::serde::{ReadCtx, WriteCtx};

use crate::{compress_ctx, compress_taxi_data, download_taxi_data};

Expand All @@ -169,10 +170,32 @@ mod test {
#[ignore]
#[test]
fn compression_ratio() {
setup_logger(LevelFilter::Warn);
setup_logger(LevelFilter::Info);
_ = compress_taxi_data();
}

#[ignore]
#[test]
fn round_trip_serde() {
let file = File::open(download_taxi_data()).unwrap();
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let reader = builder.with_limit(1).build().unwrap();

for record_batch in reader.map(|batch_result| batch_result.unwrap()) {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayRef::from_arrow(arrow_array.clone(), false);

let mut buf = Vec::<u8>::new();
let mut write_ctx = WriteCtx::new(&mut buf);
write_ctx.write(vortex_array.as_ref()).unwrap();

let mut read = buf.as_slice();
let mut read_ctx = ReadCtx::new(vortex_array.dtype(), &mut read);
read_ctx.read().unwrap();
}
}

#[ignore]
#[test]
fn round_trip_arrow() {
Expand All @@ -189,6 +212,8 @@ mod test {
}
}

// Ignoring since Struct arrays don't currently support equality.
// https://github.com/apache/arrow-rs/issues/5199
#[ignore]
#[test]
fn round_trip_arrow_compressed() {
Expand Down
2 changes: 1 addition & 1 deletion vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ linkme = "0.3.23"
log = "0.4.20"
num-traits = "0.2.18"
num_enum = "0.7.2"
once_cell = "1.19.0"
paste = "1.0.14"
rand = { version = "0.8.5", features = [] }
rayon = "1.8.1"
roaring = "0.10.3"
Expand Down
175 changes: 175 additions & 0 deletions vortex-array/src/array/composite/array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use std::any::Any;
use std::fmt::{Debug, Display};
use std::sync::{Arc, RwLock};

use linkme::distributed_slice;

use crate::array::composite::{
find_extension, CompositeExtensionRef, CompositeID, TypedCompositeArray,
};
use crate::array::{Array, ArrayRef, Encoding, EncodingId, EncodingRef, ENCODINGS};
use crate::compress::EncodingCompression;
use crate::compute::ArrayCompute;
use crate::dtype::DType;
use crate::error::VortexResult;
use crate::formatter::{ArrayDisplay, ArrayFormatter};
use crate::serde::{ArraySerde, BytesSerde, EncodingSerde};
use crate::stats::{Stats, StatsCompute, StatsSet};

pub trait CompositeMetadata:
'static + Debug + Display + Send + Sync + Sized + Clone + BytesSerde
{
fn id(&self) -> CompositeID;
}

#[derive(Debug, Clone)]
pub struct CompositeArray {
extension: CompositeExtensionRef,
metadata: Arc<Vec<u8>>,
underlying: ArrayRef,
dtype: DType,
stats: Arc<RwLock<StatsSet>>,
}

impl CompositeArray {
pub fn new(id: CompositeID, metadata: Arc<Vec<u8>>, underlying: ArrayRef) -> Self {
let dtype = DType::Composite(id, underlying.dtype().is_nullable().into());
let extension = find_extension(id.0).expect("Unrecognized composite extension");
Self {
extension,
metadata,
underlying,
dtype,
stats: Arc::new(RwLock::new(StatsSet::new())),
}
}

#[inline]
pub fn id(&self) -> CompositeID {
self.extension.id()
}

#[inline]
pub fn extension(&self) -> CompositeExtensionRef {
self.extension
}

pub fn metadata(&self) -> Arc<Vec<u8>> {
self.metadata.clone()
}

#[inline]
pub fn underlying(&self) -> &dyn Array {
self.underlying.as_ref()
}

pub fn as_typed<M: CompositeMetadata>(&self) -> TypedCompositeArray<M> {
TypedCompositeArray::new(
M::deserialize(self.metadata().as_slice()).unwrap(),
dyn_clone::clone_box(self.underlying()),
)
}

pub fn as_typed_compute(&self) -> Box<dyn ArrayCompute> {
self.extension.as_typed_compute(self)
}
}

impl Array for CompositeArray {
#[inline]
fn as_any(&self) -> &dyn Any {
self
}

#[inline]
fn boxed(self) -> ArrayRef {
Box::new(self)
}

#[inline]
fn into_any(self: Box<Self>) -> Box<dyn Any> {
self
}

#[inline]
fn len(&self) -> usize {
self.underlying.len()
}

#[inline]
fn is_empty(&self) -> bool {
self.underlying.is_empty()
}

#[inline]
fn dtype(&self) -> &DType {
&self.dtype
}

#[inline]
fn stats(&self) -> Stats {
Stats::new(&self.stats, self)
}

fn slice(&self, start: usize, stop: usize) -> VortexResult<ArrayRef> {
Ok(Self::new(
self.id(),
self.metadata.clone(),
self.underlying.slice(start, stop)?,
)
.boxed())
}

#[inline]
fn encoding(&self) -> EncodingRef {
&CompositeEncoding
}

#[inline]
fn nbytes(&self) -> usize {
self.underlying.nbytes()
}

fn serde(&self) -> Option<&dyn ArraySerde> {
Some(self)
}
}

impl StatsCompute for CompositeArray {}

impl<'arr> AsRef<(dyn Array + 'arr)> for CompositeArray {
fn as_ref(&self) -> &(dyn Array + 'arr) {
self
}
}

impl ArrayDisplay for CompositeArray {
fn fmt(&self, f: &mut ArrayFormatter) -> std::fmt::Result {
f.property("metadata", format!("{:#?}", self.metadata().as_slice()))?;
f.child("underlying", self.underlying.as_ref())
}
}

#[derive(Debug)]
pub struct CompositeEncoding;

impl CompositeEncoding {
pub const ID: EncodingId = EncodingId::new("vortex.composite");
}

#[distributed_slice(ENCODINGS)]
static ENCODINGS_COMPOSITE: EncodingRef = &CompositeEncoding;

impl Encoding for CompositeEncoding {
fn id(&self) -> &EncodingId {
&Self::ID
}

fn compression(&self) -> Option<&dyn EncodingCompression> {
Some(self)
}

fn serde(&self) -> Option<&dyn EncodingSerde> {
Some(self)
}
}
96 changes: 0 additions & 96 deletions vortex-array/src/array/composite/as_arrow.rs

This file was deleted.

Loading