-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
greedily combine chunks before compressing #783
Changes from 7 commits
baa8830
5bddf67
50c226c
a4d985b
683f72b
3dc3abe
42e2113
6ed8893
2027148
2399ebb
6842593
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
#[macro_export] | ||
macro_rules! assert_arrays_eq { | ||
($expected:expr, $actual:expr) => { | ||
let expected: Array = $expected.into(); | ||
let actual: Array = $actual.into(); | ||
assert_eq!(expected.dtype(), actual.dtype()); | ||
|
||
let expected_contents = (0..expected.len()) | ||
.map(|idx| scalar_at(&expected, idx).map(|x| x.into_value())) | ||
.collect::<VortexResult<Vec<_>>>() | ||
.unwrap(); | ||
let actual_contents = (0..actual.len()) | ||
.map(|idx| scalar_at(&expected, idx).map(|x| x.into_value())) | ||
.collect::<VortexResult<Vec<_>>>() | ||
.unwrap(); | ||
|
||
assert_eq!(expected_contents, actual_contents); | ||
}; | ||
} | ||
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -2,6 +2,7 @@ | |||||||||||||||||||||||||||
//! | ||||||||||||||||||||||||||||
//! Vortex is a chunked array library that's able to | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
use canonical::try_canonicalize_chunks; | ||||||||||||||||||||||||||||
use futures_util::stream; | ||||||||||||||||||||||||||||
use itertools::Itertools; | ||||||||||||||||||||||||||||
use serde::{Deserialize, Serialize}; | ||||||||||||||||||||||||||||
|
@@ -18,7 +19,7 @@ use crate::stream::{ArrayStream, ArrayStreamAdapter}; | |||||||||||||||||||||||||||
use crate::validity::Validity::NonNullable; | ||||||||||||||||||||||||||||
use crate::validity::{ArrayValidity, LogicalValidity, Validity}; | ||||||||||||||||||||||||||||
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; | ||||||||||||||||||||||||||||
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray}; | ||||||||||||||||||||||||||||
use crate::{impl_encoding, Array, ArrayDType, ArrayDef, ArrayTrait, IntoArray, IntoCanonical}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
mod canonical; | ||||||||||||||||||||||||||||
mod compute; | ||||||||||||||||||||||||||||
|
@@ -132,6 +133,72 @@ impl ChunkedArray { | |||||||||||||||||||||||||||
pub fn array_stream(&self) -> impl ArrayStream + '_ { | ||||||||||||||||||||||||||||
ArrayStreamAdapter::new(self.dtype().clone(), stream::iter(self.chunks().map(Ok))) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
pub fn rechunk(&self, target_bytesize: usize, target_rowsize: usize) -> VortexResult<Self> { | ||||||||||||||||||||||||||||
fn combine_validities( | ||||||||||||||||||||||||||||
dtype: &DType, | ||||||||||||||||||||||||||||
validities: Vec<LogicalValidity>, | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can make this simpler by taking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See comment below about removing this function. |
||||||||||||||||||||||||||||
) -> VortexResult<Validity> { | ||||||||||||||||||||||||||||
let validity = validities.clone().into_iter().collect::<Validity>(); | ||||||||||||||||||||||||||||
match (dtype.is_nullable(), validity) { | ||||||||||||||||||||||||||||
(true, validity) => Ok(validity), | ||||||||||||||||||||||||||||
(false, Validity::AllValid) => Ok(Validity::NonNullable), | ||||||||||||||||||||||||||||
(false, _) => vortex_bail!( | ||||||||||||||||||||||||||||
"for non-nullable dtype, child validities ought to all be AllValid" | ||||||||||||||||||||||||||||
), | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This can be simplified There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I killed combine_validity entirely by using one of your earlier suggestions to go through ChunkedArray. Does that seem reasonable? |
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let dtype = self.dtype(); | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should inline this, this function is going to be inlined anyway and having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let mut new_chunks: Vec<Array> = Vec::new(); | ||||||||||||||||||||||||||||
let mut validities_to_combine: Vec<LogicalValidity> = Vec::new(); | ||||||||||||||||||||||||||||
let mut chunks_to_combine: Vec<Array> = Vec::new(); | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you don't need type annotations here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||
let mut new_chunk_n_bytes = 0; | ||||||||||||||||||||||||||||
let mut new_chunk_n_elements = 0; | ||||||||||||||||||||||||||||
for chunk in self.chunks() { | ||||||||||||||||||||||||||||
robert3005 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
let n_bytes = chunk.nbytes(); | ||||||||||||||||||||||||||||
let n_elements = chunk.len(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if (new_chunk_n_bytes + n_bytes > target_bytesize | ||||||||||||||||||||||||||||
|| new_chunk_n_elements + n_elements > target_rowsize) | ||||||||||||||||||||||||||||
&& !chunks_to_combine.is_empty() | ||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||
let canonical = try_canonicalize_chunks( | ||||||||||||||||||||||||||||
robert3005 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
chunks_to_combine, | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function should not require a Vec but instead |
||||||||||||||||||||||||||||
combine_validities(dtype, validities_to_combine)?, | ||||||||||||||||||||||||||||
dtype, | ||||||||||||||||||||||||||||
)?; | ||||||||||||||||||||||||||||
new_chunks.push(canonical.into()); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
new_chunk_n_bytes = 0; | ||||||||||||||||||||||||||||
new_chunk_n_elements = 0; | ||||||||||||||||||||||||||||
validities_to_combine = Vec::new(); | ||||||||||||||||||||||||||||
chunks_to_combine = Vec::new(); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if n_bytes > target_bytesize || n_elements > target_rowsize { | ||||||||||||||||||||||||||||
new_chunks.push(chunk.into_canonical()?.into()); // TODO(dk): rechunking maybe shouldn't produce canonical chunks | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should just push the chunk without canonicalizing here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In follow up we change the logic to not canonicalize the chunks, i.e. stop using try_canonicalize_chunks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||
new_chunk_n_bytes += n_bytes; | ||||||||||||||||||||||||||||
new_chunk_n_elements += n_elements; | ||||||||||||||||||||||||||||
validities_to_combine.push(chunk.with_dyn(|x| x.logical_validity())); | ||||||||||||||||||||||||||||
chunks_to_combine.push(chunk); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
if !chunks_to_combine.is_empty() { | ||||||||||||||||||||||||||||
let canonical = try_canonicalize_chunks( | ||||||||||||||||||||||||||||
chunks_to_combine, | ||||||||||||||||||||||||||||
combine_validities(dtype, validities_to_combine)?, | ||||||||||||||||||||||||||||
dtype, | ||||||||||||||||||||||||||||
)?; | ||||||||||||||||||||||||||||
let chunk: Array = canonical.into(); | ||||||||||||||||||||||||||||
new_chunks.push(chunk); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Self::try_new(new_chunks, dtype.clone()) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
impl ArrayTrait for ChunkedArray {} | ||||||||||||||||||||||||||||
|
@@ -187,11 +254,12 @@ impl SubtractScalarFn for ChunkedArray { | |||||||||||||||||||||||||||
#[cfg(test)] | ||||||||||||||||||||||||||||
mod test { | ||||||||||||||||||||||||||||
use vortex_dtype::{DType, NativePType, Nullability, PType}; | ||||||||||||||||||||||||||||
use vortex_error::VortexResult; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
use crate::array::chunked::ChunkedArray; | ||||||||||||||||||||||||||||
use crate::compute::slice; | ||||||||||||||||||||||||||||
use crate::compute::unary::subtract_scalar; | ||||||||||||||||||||||||||||
use crate::{Array, IntoArray, IntoArrayVariant, ToArray}; | ||||||||||||||||||||||||||||
use crate::compute::unary::{scalar_at, subtract_scalar}; | ||||||||||||||||||||||||||||
use crate::{assert_arrays_eq, Array, ArrayDType, IntoArray, IntoArrayVariant, ToArray}; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
fn chunked_array() -> ChunkedArray { | ||||||||||||||||||||||||||||
ChunkedArray::try_new( | ||||||||||||||||||||||||||||
|
@@ -282,4 +350,67 @@ mod test { | |||||||||||||||||||||||||||
.to_vec(); | ||||||||||||||||||||||||||||
assert_eq!(results, &[6u64, 7, 8]); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
#[test] | ||||||||||||||||||||||||||||
fn test_rechunk_one_chunk() { | ||||||||||||||||||||||||||||
let chunked = ChunkedArray::try_new( | ||||||||||||||||||||||||||||
vec![vec![0u64].into_array()], | ||||||||||||||||||||||||||||
DType::Primitive(PType::U64, Nullability::NonNullable), | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let rechunked = chunked.rechunk_default().unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
assert_arrays_eq!(chunked, rechunked); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
#[test] | ||||||||||||||||||||||||||||
fn test_rechunk_two_chunks() { | ||||||||||||||||||||||||||||
let chunked = ChunkedArray::try_new( | ||||||||||||||||||||||||||||
vec![vec![0u64].into_array(), vec![5u64].into_array()], | ||||||||||||||||||||||||||||
DType::Primitive(PType::U64, Nullability::NonNullable), | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let rechunked = chunked.rechunk_default().unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
assert_eq!(rechunked.nchunks(), 1); | ||||||||||||||||||||||||||||
assert_arrays_eq!(chunked, rechunked); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
#[test] | ||||||||||||||||||||||||||||
fn test_rechunk_tiny_target_chunks() { | ||||||||||||||||||||||||||||
let chunked = ChunkedArray::try_new( | ||||||||||||||||||||||||||||
vec![vec![0u64, 1, 2, 3].into_array(), vec![4u64, 5].into_array()], | ||||||||||||||||||||||||||||
DType::Primitive(PType::U64, Nullability::NonNullable), | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let rechunked = chunked.rechunk(1 << 16, 5).unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
assert_eq!(rechunked.nchunks(), 2); | ||||||||||||||||||||||||||||
assert!(rechunked.chunks().all(|c| c.len() < 5)); | ||||||||||||||||||||||||||||
assert_arrays_eq!(chunked, rechunked); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
#[test] | ||||||||||||||||||||||||||||
fn test_rechunk_with_too_big_chunk() { | ||||||||||||||||||||||||||||
let chunked = ChunkedArray::try_new( | ||||||||||||||||||||||||||||
vec![ | ||||||||||||||||||||||||||||
vec![0u64, 1, 2].into_array(), | ||||||||||||||||||||||||||||
vec![42_u64; 6].into_array(), | ||||||||||||||||||||||||||||
vec![4u64, 5].into_array(), | ||||||||||||||||||||||||||||
vec![6u64, 7].into_array(), | ||||||||||||||||||||||||||||
vec![8u64, 9].into_array(), | ||||||||||||||||||||||||||||
], | ||||||||||||||||||||||||||||
DType::Primitive(PType::U64, Nullability::NonNullable), | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
let rechunked = chunked.rechunk(1 << 16, 5).unwrap(); | ||||||||||||||||||||||||||||
// greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9] | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
assert_eq!(rechunked.nchunks(), 4); | ||||||||||||||||||||||||||||
assert_arrays_eq!(chunked, rechunked); | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
mod assertions; | ||
mod bool; | ||
mod chunked; | ||
mod constant; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,15 +55,21 @@ pub struct CompressConfig { | |
sample_size: u16, | ||
sample_count: u16, | ||
max_depth: u8, | ||
target_chunk_bytesize: usize, | ||
target_chunk_rowsize: usize, | ||
} | ||
|
||
impl Default for CompressConfig { | ||
fn default() -> Self { | ||
let kib = 1 << 10; | ||
let mib = 1 << 20; | ||
Self { | ||
// Sample length should always be multiple of 1024 | ||
sample_size: 128, | ||
sample_count: 8, | ||
max_depth: 3, | ||
target_chunk_bytesize: 16 * mib, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This use to be called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
target_chunk_rowsize: 64 * kib, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would make this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and Done. |
||
} | ||
} | ||
} | ||
|
@@ -199,9 +205,12 @@ impl<'a> SamplingCompressor<'a> { | |
fn compress_array(&self, arr: &Array) -> VortexResult<CompressedArray<'a>> { | ||
match arr.encoding().id() { | ||
Chunked::ID => { | ||
// For chunked arrays, we compress each chunk individually | ||
let chunked = ChunkedArray::try_from(arr)?; | ||
let compressed_chunks = chunked | ||
let less_chunked = chunked.rechunk( | ||
self.options().target_chunk_bytesize, | ||
self.options().target_chunk_rowsize, | ||
)?; | ||
let compressed_chunks = less_chunked | ||
.chunks() | ||
.map(|chunk| { | ||
self.compress_array(&chunk) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bleh, we need an equals for array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that'd be fantastic!