Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

greedily combine chunks before compressing #783

Merged
merged 11 commits into from
Sep 11, 2024
11 changes: 8 additions & 3 deletions encodings/roaring/src/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl RoaringBoolArray {
if array.encoding().id() == Bool::ID {
roaring_bool_encode(BoolArray::try_from(array)?).map(|a| a.into_array())
} else {
vortex_bail!("RoaringInt can only encode boolean arrays")
vortex_bail!("RoaringBool can only encode boolean arrays")
}
}
}
Expand All @@ -84,12 +84,17 @@ impl BoolArrayTrait for RoaringBoolArray {
}

impl AcceptArrayVisitor for RoaringBoolArray {
fn accept(&self, _visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> {
// TODO(ngates): should we store a buffer in memory? Or delay serialization?
// Or serialize into metadata? The only reason we support buffers is so we can write to
// the wire without copying into FlatBuffers. But if we need to allocate to serialize
// the bitmap anyway, then may as well shove it into metadata.
todo!()
visitor.visit_buffer(
self.array()
.buffer()
.ok_or(vortex_err!("roaring bool should have a buffer"))?,
)?;
Ok(())
}
}

Expand Down
74 changes: 73 additions & 1 deletion vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! Vortex is a chunked array library that's able to

use canonical::try_canonicalize_chunks;
use futures_util::stream;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
Expand All @@ -18,7 +19,7 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::validity::Validity::NonNullable;
use crate::validity::{ArrayValidity, LogicalValidity, Validity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray};
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, IntoCanonical};

mod canonical;
mod compute;
Expand Down Expand Up @@ -132,6 +133,77 @@ impl ChunkedArray {
pub fn array_stream(&self) -> impl ArrayStream + '_ {
ArrayStreamAdapter::new(self.dtype().clone(), stream::iter(self.chunks().map(Ok)))
}

pub fn rechunk_default(&self) -> VortexResult<Self> {
let gibibyte = 1 << 30;
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
let chunk_max = 1 << 16; // BtrBlocks uses 64K so we use 64Ki
self.rechunk(gibibyte, chunk_max)
}

pub fn rechunk(&self, max_n_bytes: usize, max_n_elements: usize) -> VortexResult<Self> {
fn combine_validities(
dtype: &DType,
validities: Vec<LogicalValidity>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can make this simpler by taking &[LogicalValidity] and cloning

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below about removing this function.

) -> VortexResult<Validity> {
let validity = validities.clone().into_iter().collect::<Validity>();
match (dtype.is_nullable(), validity) {
(true, validity) => Ok(validity),
(false, Validity::AllValid) => Ok(Validity::NonNullable),
(false, _) => vortex_bail!(
"for non-nullable dtype, child validities ought to all be AllValid"
),
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let validity = validities.clone().into_iter().collect::<Validity>();
match (dtype.is_nullable(), validity) {
(true, validity) => Ok(validity),
(false, Validity::AllValid) => Ok(Validity::NonNullable),
(false, _) => vortex_bail!(
"for non-nullable dtype, child validities ought to all be AllValid"
),
}
if self.dtype().is_nullable() {
validities_to_combine.iter().cloned().collect::<Validity>()
} else {
NonNullable
}

This can be simplified

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I killed combine_validity entirely by using one of your earlier suggestions to go through ChunkedArray. Does that seem reasonable?

}

let dtype = self.dtype();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should inline this, this function is going to be inlined anyway and having self in front adds extra context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


let mut new_chunks: Vec<Array> = Vec::new();
let mut validities_to_combine: Vec<LogicalValidity> = Vec::new();
let mut chunks_to_combine: Vec<Array> = Vec::new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need type annotations here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

let mut new_chunk_n_bytes = 0;
let mut new_chunk_n_elements = 0;
for chunk in self.chunks() {
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
let n_bytes = chunk.nbytes();
let n_elements = chunk.len();

if new_chunk_n_bytes + n_bytes > max_n_bytes
|| new_chunk_n_elements + n_elements > max_n_elements
{
let canonical = try_canonicalize_chunks(
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
chunks_to_combine,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should not require a Vec but instead &[Array]. It never uses the ownership for anything

combine_validities(dtype, validities_to_combine)?,
dtype,
)?;
new_chunks.push(canonical.into());

new_chunk_n_bytes = 0;
new_chunk_n_elements = 0;
validities_to_combine = Vec::new();
chunks_to_combine = Vec::new();
}

if n_bytes > max_n_bytes || n_elements > max_n_elements {
new_chunks.push(chunk.into_canonical()?.into()); // TODO(dk): rechunking maybe shouldn't produce canonical chunks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just push the chunk without canonicalizing here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In follow up we change the logic to not canonicalize the chunks, i.e. stop using try_canonicalize_chunks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else {
new_chunk_n_bytes += n_bytes;
new_chunk_n_elements += n_elements;
validities_to_combine.push(chunk.with_dyn(|x| x.logical_validity()));
chunks_to_combine.push(chunk);
}
}

if !chunks_to_combine.is_empty() {
let canonical = try_canonicalize_chunks(
chunks_to_combine,
combine_validities(dtype, validities_to_combine)?,
dtype,
)?;
let chunk: Array = canonical.into();
new_chunks.push(chunk);
}

Self::try_new(new_chunks, dtype.clone())
}
}

impl ArrayTrait for ChunkedArray {}
Expand Down
3 changes: 2 additions & 1 deletion vortex-sampling-compressor/src/compressors/roaring_bool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashSet;

use vortex::array::Bool;
use vortex::encoding::EncodingRef;
use vortex::{Array, ArrayDType, ArrayDef, IntoArray, IntoArrayVariant};
use vortex_dtype::DType;
Expand All @@ -20,7 +21,7 @@ impl EncodingCompressor for RoaringBoolCompressor {

fn can_compress(&self, array: &Array) -> Option<&dyn EncodingCompressor> {
// Only support bool enc arrays
if array.encoding().id() != RoaringBool::ID {
if array.encoding().id() != Bool::ID {
return None;
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl<'a> SamplingCompressor<'a> {
fn compress_array(&self, arr: &Array) -> VortexResult<CompressedArray<'a>> {
match arr.encoding().id() {
Chunked::ID => {
// For chunked arrays, we compress each chunk individually
let chunked = ChunkedArray::try_from(arr)?;
let compressed_chunks = chunked
let less_chunked = chunked.rechunk_default()?;
let compressed_chunks = less_chunked
.chunks()
.map(|chunk| {
self.compress_array(&chunk)
Expand Down
120 changes: 120 additions & 0 deletions vortex-sampling-compressor/tests/smoketest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ use vortex_sampling_compressor::{CompressConfig, SamplingCompressor};

#[cfg(test)]
mod tests {
use vortex::array::{Bool, ChunkedArray, VarBin};
use vortex::variants::{ArrayVariants, StructArrayTrait};
use vortex::ArrayDef;
use vortex_datetime_dtype::TimeUnit;
use vortex_datetime_parts::DateTimeParts;
use vortex_dict::Dict;
use vortex_fastlanes::FoR;
use vortex_sampling_compressor::compressors::fsst::FSSTCompressor;

use super::*;
Expand Down Expand Up @@ -79,6 +85,120 @@ mod tests {
assert_eq!(compressed.dtype(), to_compress.dtype());
}

#[test]
pub fn smoketest_compressor_on_chunked_array() {
let compressor = SamplingCompressor::new_with_options(
robert3005 marked this conversation as resolved.
Show resolved Hide resolved
HashSet::from([
&ALPCompressor as CompressorRef,
&BitPackedCompressor,
// TODO(robert): Implement minimal compute for DeltaArrays - scalar_at and slice
// &DeltaCompressor,
&DictCompressor,
&FoRCompressor,
&FSSTCompressor,
&DateTimePartsCompressor,
&RoaringBoolCompressor,
&RoaringIntCompressor,
&DEFAULT_RUN_END_COMPRESSOR,
&SparseCompressor,
&ZigZagCompressor,
]),
CompressConfig::default(),
);

let chunk_size = 1 << 14;

let ints: Vec<Array> = (0..4).map(|_| make_primitive_column(chunk_size)).collect();
let bools: Vec<Array> = (0..4).map(|_| make_bool_column(chunk_size)).collect();
let varbins: Vec<Array> = (0..4).map(|_| make_string_column(chunk_size)).collect();
let binaries: Vec<Array> = (0..4).map(|_| make_binary_column(chunk_size)).collect();
let timestamps: Vec<Array> = (0..4).map(|_| make_timestamp_column(chunk_size)).collect();

fn chunked(arrays: Vec<Array>) -> Array {
let dtype = arrays[0].dtype().clone();
ChunkedArray::try_new(arrays, dtype).unwrap().into()
}

let to_compress = StructArray::try_new(
vec![
"prim_col".into(),
"bool_col".into(),
"varbin_col".into(),
"binary_col".into(),
"timestamp_col".into(),
]
.into(),
vec![
chunked(ints),
chunked(bools),
chunked(varbins),
chunked(binaries),
chunked(timestamps),
],
chunk_size * 4,
Validity::NonNullable,
)
.unwrap()
.into_array();

println!("uncompressed: {}", to_compress.tree_display());
let compressed = compressor
.compress(&to_compress, None)
.unwrap()
.into_array();

println!("compressed: {}", compressed.tree_display());
assert_eq!(compressed.dtype(), to_compress.dtype());

let struct_array: StructArray = compressed.try_into().unwrap();
let struct_array: &dyn StructArrayTrait = struct_array.as_struct_array().unwrap();

let prim_col: ChunkedArray = struct_array
.field_by_name("prim_col")
.unwrap()
.try_into()
.unwrap();
for chunk in prim_col.chunks() {
assert_eq!(chunk.encoding().id(), FoR::ID);
}

let bool_col: ChunkedArray = struct_array
.field_by_name("bool_col")
.unwrap()
.try_into()
.unwrap();
for chunk in bool_col.chunks() {
assert_eq!(chunk.encoding().id(), Bool::ID);
}

let varbin_col: ChunkedArray = struct_array
.field_by_name("varbin_col")
.unwrap()
.try_into()
.unwrap();
for chunk in varbin_col.chunks() {
assert_eq!(chunk.encoding().id(), Dict::ID);
}

let binary_col: ChunkedArray = struct_array
.field_by_name("binary_col")
.unwrap()
.try_into()
.unwrap();
for chunk in binary_col.chunks() {
assert_eq!(chunk.encoding().id(), VarBin::ID);
}

let timestamp_col: ChunkedArray = struct_array
.field_by_name("timestamp_col")
.unwrap()
.try_into()
.unwrap();
for chunk in timestamp_col.chunks() {
assert_eq!(chunk.encoding().id(), DateTimeParts::ID);
}
}

fn make_primitive_column(count: usize) -> Array {
PrimitiveArray::from_vec(
(0..count).map(|i| i as i64).collect::<Vec<i64>>(),
Expand Down
Loading