diff --git a/crates/polars-arrow/src/bitmap/utils/iterator.rs b/crates/polars-arrow/src/bitmap/utils/iterator.rs index 18dc8e20a815..e95246483d20 100644 --- a/crates/polars-arrow/src/bitmap/utils/iterator.rs +++ b/crates/polars-arrow/src/bitmap/utils/iterator.rs @@ -46,6 +46,91 @@ impl<'a> BitmapIter<'a> { rest_len, } } + + /// Consume and returns the numbers of `1` / `true` values at the beginning of the iterator. + /// + /// This performs the same operation as `(&mut iter).take_while(|b| b).count()`. + /// + /// This is a lot more efficient than consecutively polling the iterator and should therefore + /// be preferred, if the use-case allows for it. + pub fn take_leading_ones(&mut self) -> usize { + let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize); + self.word_len -= word_ones; + self.word = self.word.wrapping_shr(word_ones as u32); + + if self.word_len != 0 { + return word_ones; + } + + let mut num_leading_ones = word_ones; + + while self.rest_len != 0 { + self.word_len = usize::min(self.rest_len, 64); + self.rest_len -= self.word_len; + + unsafe { + let chunk = self.bytes.get_unchecked(..8).try_into().unwrap(); + self.word = u64::from_le_bytes(chunk); + self.bytes = self.bytes.get_unchecked(8..); + } + + let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize); + self.word_len -= word_ones; + self.word = self.word.wrapping_shr(word_ones as u32); + num_leading_ones += word_ones; + + if self.word_len != 0 { + return num_leading_ones; + } + } + + num_leading_ones + } + + /// Consume and returns the numbers of `0` / `false` values that the start of the iterator. + /// + /// This performs the same operation as `(&mut iter).take_while(|b| !b).count()`. + /// + /// This is a lot more efficient than consecutively polling the iterator and should therefore + /// be preferred, if the use-case allows for it. + pub fn take_leading_zeros(&mut self) -> usize { + let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize); + self.word_len -= word_zeros; + self.word = self.word.wrapping_shr(word_zeros as u32); + + if self.word_len != 0 { + return word_zeros; + } + + let mut num_leading_zeros = word_zeros; + + while self.rest_len != 0 { + self.word_len = usize::min(self.rest_len, 64); + self.rest_len -= self.word_len; + unsafe { + let chunk = self.bytes.get_unchecked(..8).try_into().unwrap(); + self.word = u64::from_le_bytes(chunk); + self.bytes = self.bytes.get_unchecked(8..); + } + + let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize); + self.word_len -= word_zeros; + self.word = self.word.wrapping_shr(word_zeros as u32); + num_leading_zeros += word_zeros; + + if self.word_len != 0 { + return num_leading_zeros; + } + } + + num_leading_zeros + } + + /// Returns the number of remaining elements in the iterator + #[inline] + pub fn num_remaining(&self) -> usize { + self.word_len + self.rest_len + } } impl<'a> Iterator for BitmapIter<'a> { @@ -53,35 +138,31 @@ impl<'a> Iterator for BitmapIter<'a> { #[inline] fn next(&mut self) -> Option { - if self.word_len != 0 { - let ret = self.word & 1 != 0; - self.word >>= 1; - self.word_len -= 1; - return Some(ret); - } + if self.word_len == 0 { + if self.rest_len == 0 { + return None; + } - if self.rest_len != 0 { self.word_len = self.rest_len.min(64); self.rest_len -= self.word_len; + unsafe { let chunk = self.bytes.get_unchecked(..8).try_into().unwrap(); self.word = u64::from_le_bytes(chunk); self.bytes = self.bytes.get_unchecked(8..); } - - let ret = self.word & 1 != 0; - self.word >>= 1; - self.word_len -= 1; - return Some(ret); } - None + let ret = self.word & 1 != 0; + self.word >>= 1; + self.word_len -= 1; + Some(ret) } #[inline] fn size_hint(&self) -> (usize, Option) { - let exact = self.word_len + self.rest_len; - (exact, Some(exact)) + let num_remaining = self.num_remaining(); + (num_remaining, Some(num_remaining)) } } @@ -102,3 +183,59 @@ impl<'a> DoubleEndedIterator for BitmapIter<'a> { unsafe impl TrustedLen for BitmapIter<'_> {} impl ExactSizeIterator for BitmapIter<'_> {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[ignore = "Fuzz test. Too slow"] + fn test_leading_ops() { + for _ in 0..10_000 { + let bs = rand::random::() % 4; + + let mut length = 0; + let mut pattern = Vec::new(); + for _ in 0..rand::random::() % 1024 { + let word = match bs { + 0 => u64::MIN, + 1 => u64::MAX, + 2 | 3 => rand::random(), + _ => unreachable!(), + }; + + pattern.extend_from_slice(&word.to_le_bytes()); + length += 64; + } + + for _ in 0..rand::random::() % 7 { + pattern.push(rand::random::()); + length += 8; + } + + let last_length = rand::random::() % 8; + if last_length != 0 { + pattern.push(rand::random::()); + length += last_length; + } + + let mut iter = BitmapIter::new(&pattern, 0, length); + + let mut prev_remaining = iter.num_remaining(); + while iter.num_remaining() != 0 { + let num_ones = iter.clone().take_leading_ones(); + assert_eq!(num_ones, (&mut iter).take_while(|&b| b).count()); + + let num_zeros = iter.clone().take_leading_zeros(); + assert_eq!(num_zeros, (&mut iter).take_while(|&b| !b).count()); + + // Ensure that we are making progress + assert!(iter.num_remaining() < prev_remaining); + prev_remaining = iter.num_remaining(); + } + + assert_eq!(iter.take_leading_zeros(), 0); + assert_eq!(iter.take_leading_ones(), 0); + } + } +} diff --git a/crates/polars-arrow/src/legacy/utils.rs b/crates/polars-arrow/src/legacy/utils.rs index a9a6b7d0ed78..af482171a1a9 100644 --- a/crates/polars-arrow/src/legacy/utils.rs +++ b/crates/polars-arrow/src/legacy/utils.rs @@ -18,7 +18,7 @@ pub trait CustomIterTools: Iterator { where Self: Sized, { - TrustMyLength::new(self, length) + unsafe { TrustMyLength::new(self, length) } } fn collect_trusted>(self) -> T diff --git a/crates/polars-arrow/src/pushable.rs b/crates/polars-arrow/src/pushable.rs index 025e78275dcd..de642833bc6a 100644 --- a/crates/polars-arrow/src/pushable.rs +++ b/crates/polars-arrow/src/pushable.rs @@ -19,6 +19,12 @@ pub trait Pushable: Sized + Default { fn push(&mut self, value: T); fn len(&self) -> usize; fn push_null(&mut self); + #[inline] + fn extend_n(&mut self, n: usize, iter: impl Iterator) { + for item in iter.take(n) { + self.push(item); + } + } fn extend_constant(&mut self, additional: usize, value: T); fn extend_null_constant(&mut self, additional: usize); fn freeze(self) -> Self::Freeze; @@ -31,6 +37,7 @@ impl Pushable for MutableBitmap { fn reserve(&mut self, additional: usize) { MutableBitmap::reserve(self, additional) } + #[inline] fn len(&self) -> usize { self.len() @@ -82,6 +89,11 @@ impl Pushable for Vec { self.push(value) } + #[inline] + fn extend_n(&mut self, n: usize, iter: impl Iterator) { + self.extend(iter.take(n)); + } + #[inline] fn extend_constant(&mut self, additional: usize, value: T) { self.resize(self.len() + additional, value); diff --git a/crates/polars-arrow/src/trusted_len.rs b/crates/polars-arrow/src/trusted_len.rs index 3237ba83cbb2..5f194770e7c4 100644 --- a/crates/polars-arrow/src/trusted_len.rs +++ b/crates/polars-arrow/src/trusted_len.rs @@ -87,8 +87,13 @@ impl TrustMyLength where I: Iterator, { + /// Create a new `TrustMyLength` iterator + /// + /// # Safety + /// + /// This is safe if the iterator always has the exact length given by `len`. #[inline] - pub fn new(iter: I, len: usize) -> Self { + pub unsafe fn new(iter: I, len: usize) -> Self { Self { iter, len } } } @@ -104,6 +109,7 @@ where self.iter.next() } + #[inline] fn size_hint(&self) -> (usize, Option) { (self.len, Some(self.len)) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs index aa74d1cc9b4a..0262798cc7db 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs @@ -70,7 +70,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { Some(additional), values, page_values, - ), + )?, BinaryState::Required(page) => { for x in page.values.by_ref().take(additional) { values.push(x) @@ -92,7 +92,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { Some(additional), offsets, page_values.lengths.by_ref(), - ); + )?; let length = *offsets.last() - last_offset; @@ -123,7 +123,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { .values .by_ref() .map(|index| page_dict.value(index as usize)), - ); + )?; page_values.values.get_result()?; }, BinaryState::RequiredDictionary(page) => { @@ -148,7 +148,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { Some(additional), values, page_values.by_ref(), - ); + )?; }, BinaryState::FilteredOptionalDelta(page_validity, page_values) => { extend_from_decoder( @@ -157,7 +157,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { Some(additional), values, page_values.by_ref(), - ); + )?; }, BinaryState::FilteredRequiredDictionary(page) => { // Already done on the dict. @@ -186,7 +186,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { .values .by_ref() .map(|index| page_dict.value(index as usize)), - ); + )?; page_values.values.get_result()?; }, BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder( @@ -195,7 +195,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { Some(additional), values, page_values, - ), + )?, BinaryState::DeltaByteArray(page_values) => { for x in page_values.take(additional) { values.push(x) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index 4593fe16eea8..d80dd6791d8b 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -179,13 +179,13 @@ impl<'a> FilteredDelta<'a> { #[derive(Debug)] pub(crate) struct RequiredDictionary<'a> { - pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, + pub values: hybrid_rle::HybridRleDecoder<'a>, pub dict: &'a BinaryDict, } impl<'a> RequiredDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?.into_iter(); + let values = utils::dict_indices_decoder(page)?; Ok(Self { dict, values }) } @@ -198,13 +198,13 @@ impl<'a> RequiredDictionary<'a> { #[derive(Debug)] pub(crate) struct FilteredRequiredDictionary<'a> { - pub values: SliceFilteredIter>, + pub values: SliceFilteredIter>, pub dict: &'a BinaryDict, } impl<'a> FilteredRequiredDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?.into_iter(); + let values = utils::dict_indices_decoder(page)?; let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); @@ -220,13 +220,13 @@ impl<'a> FilteredRequiredDictionary<'a> { #[derive(Debug)] pub(crate) struct ValuesDictionary<'a> { - pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, + pub values: hybrid_rle::HybridRleDecoder<'a>, pub dict: &'a BinaryDict, } impl<'a> ValuesDictionary<'a> { pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?.into_iter(); + let values = utils::dict_indices_decoder(page)?; Ok(Self { dict, values }) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs index 1b3d65799293..e24cc576b005 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview/basic.rs @@ -69,7 +69,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { Some(additional), values, page_values, - ), + )?, BinaryState::Required(page) => { for x in page.values.by_ref().take(additional) { values.push_value_ignore_validity(x) @@ -87,7 +87,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { Some(additional), values, page_values, - ); + )?; }, BinaryState::FilteredRequired(page) => { for x in page.values.by_ref().take(additional) { @@ -112,7 +112,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { .values .by_ref() .map(|index| page_dict.value(index as usize)), - ); + )?; page_values.values.get_result()?; }, BinaryState::RequiredDictionary(page) => { @@ -137,7 +137,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { Some(additional), values, page_values.by_ref(), - ); + )?; }, BinaryState::FilteredOptionalDelta(page_validity, page_values) => { extend_from_decoder( @@ -146,7 +146,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { Some(additional), values, page_values.by_ref(), - ); + )?; }, BinaryState::FilteredRequiredDictionary(page) => { // TODO! directly set the dict as buffers and only insert the proper views. @@ -179,7 +179,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { .values .by_ref() .map(|index| page_dict.value(index as usize)), - ); + )?; page_values.values.get_result()?; }, BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder( @@ -188,7 +188,7 @@ impl<'a> utils::Decoder<'a> for BinViewDecoder { Some(additional), values, page_values, - ), + )?, BinaryState::DeltaByteArray(page_values) => { for x in page_values.take(additional) { values.push_value_ignore_validity(x) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/boolean/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/boolean/basic.rs index 6db66a0fba31..0e24c1bb318e 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/boolean/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/boolean/basic.rs @@ -171,7 +171,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { Some(remaining), values, &mut page_values.0, - ), + )?, State::Required(page) => { let remaining = remaining.min(page.length - page.offset); values.extend_from_slice(page.values, page.offset, remaining); @@ -190,7 +190,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { Some(remaining), values, page_values.0.by_ref(), - ); + )?; }, State::RleOptional(page_validity, page_values) => { utils::extend_from_decoder( @@ -199,7 +199,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { Some(remaining), values, &mut *page_values, - ); + )?; }, } Ok(()) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs index 710b30fe0593..03d2a8476714 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/mod.rs @@ -15,7 +15,7 @@ use super::utils::{ }; use super::PagesIter; use crate::parquet::deserialize::SliceFilteredIter; -use crate::parquet::encoding::hybrid_rle::BufferedHybridRleDecoderIter; +use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DataPage, DictPage, Page}; use crate::parquet::schema::Repetition; @@ -26,32 +26,29 @@ pub enum State<'a> { Optional(Optional<'a>), Required(Required<'a>), FilteredRequired(FilteredRequired<'a>), - FilteredOptional( - FilteredOptionalPageValidity<'a>, - BufferedHybridRleDecoderIter<'a>, - ), + FilteredOptional(FilteredOptionalPageValidity<'a>, HybridRleDecoder<'a>), } #[derive(Debug)] pub struct Required<'a> { - values: BufferedHybridRleDecoderIter<'a>, + values: HybridRleDecoder<'a>, } impl<'a> Required<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; Ok(Self { values }) } } #[derive(Debug)] pub struct FilteredRequired<'a> { - values: SliceFilteredIter>, + values: SliceFilteredIter>, } impl<'a> FilteredRequired<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); @@ -62,13 +59,13 @@ impl<'a> FilteredRequired<'a> { #[derive(Debug)] pub struct Optional<'a> { - values: BufferedHybridRleDecoderIter<'a>, + values: HybridRleDecoder<'a>, validity: OptionalPageValidity<'a>, } impl<'a> Optional<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; Ok(Self { values, @@ -138,7 +135,7 @@ where (Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => { Ok(State::FilteredOptional( FilteredOptionalPageValidity::try_new(page)?, - dict_indices_decoder(page)?.into_iter(), + dict_indices_decoder(page)?, )) }, _ => Err(utils::not_implemented(page)), @@ -173,7 +170,7 @@ where Err(_) => panic!("The maximum key is too small"), } }), - ); + )?; page.values.get_result()?; }, State::Required(page) => { @@ -210,7 +207,7 @@ where }; x }), - ); + )?; page_values.get_result()?; }, State::FilteredRequired(page) => { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs index f6ec4093c174..4ad39b8ea695 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary/nested.rs @@ -10,7 +10,7 @@ use super::super::super::PagesIter; use super::super::nested_utils::*; use super::super::utils::{dict_indices_decoder, not_implemented, MaybeNext, PageState}; use super::finish_key; -use crate::parquet::encoding::hybrid_rle::BufferedHybridRleDecoderIter; +use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::encoding::Encoding; use crate::parquet::page::{DataPage, DictPage, Page}; use crate::parquet::schema::Repetition; @@ -18,13 +18,13 @@ use crate::parquet::schema::Repetition; // The state of a required DataPage with a boolean physical type #[derive(Debug)] pub struct Required<'a> { - values: BufferedHybridRleDecoderIter<'a>, + values: HybridRleDecoder<'a>, length: usize, } impl<'a> Required<'a> { fn try_new(page: &'a DataPage) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; let length = page.num_values(); Ok(Self { values, length }) } @@ -34,7 +34,7 @@ impl<'a> Required<'a> { #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum State<'a> { - Optional(BufferedHybridRleDecoderIter<'a>), + Optional(HybridRleDecoder<'a>), Required(Required<'a>), } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs index f09fb2011be0..41d6f4f5e9e0 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary/basic.rs @@ -84,13 +84,13 @@ impl<'a> FilteredRequired<'a> { #[derive(Debug)] pub(super) struct RequiredDictionary<'a> { - pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, + pub values: hybrid_rle::HybridRleDecoder<'a>, pub dict: &'a Dict, } impl<'a> RequiredDictionary<'a> { pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; Ok(Self { dict, values }) } @@ -103,14 +103,14 @@ impl<'a> RequiredDictionary<'a> { #[derive(Debug)] pub(super) struct OptionalDictionary<'a> { - pub(super) values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, + pub(super) values: hybrid_rle::HybridRleDecoder<'a>, pub(super) validity: OptionalPageValidity<'a>, pub(super) dict: &'a Dict, } impl<'a> OptionalDictionary<'a> { pub(super) fn try_new(page: &'a DataPage, dict: &'a Dict) -> PolarsResult { - let values = dict_indices_decoder(page)?.into_iter(); + let values = dict_indices_decoder(page)?; Ok(Self { values, @@ -219,7 +219,7 @@ impl<'a> Decoder<'a> for BinaryDecoder { Some(remaining), values, &mut page.values, - ), + )?, State::Required(page) => { for x in page.values.by_ref().take(remaining) { values.push(x) @@ -236,11 +236,11 @@ impl<'a> Decoder<'a> for BinaryDecoder { &mut page.validity, Some(remaining), values, - page.values.by_ref().map(|index| { + &mut page.values.by_ref().map(|index| { let index = index as usize; &page.dict[index * self.size..(index + 1) * self.size] }), - ); + )?; page.values.get_result()?; }, State::RequiredDictionary(page) => { @@ -264,7 +264,7 @@ impl<'a> Decoder<'a> for BinaryDecoder { Some(remaining), values, page_values.by_ref(), - ); + )?; }, } Ok(()) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index 395ea7f8e253..0aa505e65a84 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -8,7 +8,7 @@ use polars_utils::slice::GetSaferUnchecked; use super::super::PagesIter; use super::utils::{DecodedState, MaybeNext, PageState}; -use crate::parquet::encoding::hybrid_rle::{BufferedHybridRleDecoderIter, HybridRleDecoder}; +use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::page::{split_buffer, DataPage, DictPage, Page}; use crate::parquet::read::levels::get_bit_width; @@ -239,7 +239,7 @@ pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState { } pub struct NestedPage<'a> { - iter: Peekable, BufferedHybridRleDecoderIter<'a>>>, + iter: Peekable, HybridRleDecoder<'a>>>, } impl<'a> NestedPage<'a> { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index 4c3655e0a01a..ca24ff535f42 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -5,16 +5,17 @@ use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use polars_error::PolarsResult; -use polars_utils::iter::FallibleIterator; use super::super::utils::{ get_selected_rows, FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }; use super::super::{utils, PagesIter}; use crate::parquet::deserialize::SliceFilteredIter; +use crate::parquet::encoding::hybrid_rle::DictionaryTranslator; use crate::parquet::encoding::{byte_stream_split, hybrid_rle, Encoding}; use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::types::{decode, NativeType as ParquetNativeType}; +use crate::read::deserialize::utils::TranslatedHybridRle; #[derive(Debug)] pub(super) struct FilteredRequiredValues<'a> { @@ -65,23 +66,23 @@ pub(super) struct ValuesDictionary<'a, T> where T: NativeType, { - pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>, - pub dict: &'a Vec, + pub values: hybrid_rle::HybridRleDecoder<'a>, + pub dict: &'a [T], } impl<'a, T> ValuesDictionary<'a, T> where T: NativeType, { - pub fn try_new(page: &'a DataPage, dict: &'a Vec) -> PolarsResult { - let values = utils::dict_indices_decoder(page)?.into_iter(); + pub fn try_new(page: &'a DataPage, dict: &'a [T]) -> PolarsResult { + let values = utils::dict_indices_decoder(page)?; Ok(Self { dict, values }) } #[inline] pub fn len(&self) -> usize { - self.values.size_hint().0 + self.values.len() } } @@ -233,8 +234,8 @@ where page_validity, Some(remaining), values, - page_values.values.by_ref().map(decode).map(self.op), - ), + &mut page_values.values.by_ref().map(decode).map(self.op), + )?, State::Required(page) => { values.extend( page.values @@ -245,20 +246,22 @@ where ); }, State::OptionalDictionary(page_validity, page_values) => { - let op1 = |index: u32| page_values.dict[index as usize]; + let translator = DictionaryTranslator(page_values.dict); + let translated_hybridrle = + TranslatedHybridRle::new(&mut page_values.values, &translator); + utils::extend_from_decoder( validity, page_validity, Some(remaining), values, - &mut page_values.values.by_ref().map(op1), - ); - page_values.values.get_result()?; + translated_hybridrle, + )?; }, State::RequiredDictionary(page) => { - let op1 = |index: u32| page.dict[index as usize]; - values.extend(page.values.by_ref().map(op1).take(remaining)); - page.values.get_result()?; + let translator = DictionaryTranslator(page.dict); + page.values + .translate_and_collect_n_into(values, remaining, &translator)?; }, State::FilteredRequired(page) => { values.extend( @@ -275,8 +278,8 @@ where page_validity, Some(remaining), values, - page_values.values.by_ref().map(decode).map(self.op), - ); + &mut page_values.values.by_ref().map(decode).map(self.op), + )?; }, State::RequiredByteStreamSplit(decoder) => { values.extend(decoder.iter_converted(decode).map(self.op).take(remaining)); @@ -286,8 +289,8 @@ where page_validity, Some(remaining), values, - decoder.iter_converted(decode).map(self.op), - ), + &mut decoder.iter_converted(decode).map(self.op), + )?, } Ok(()) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index c23e6822567f..6d74f6417a2a 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -155,11 +155,11 @@ where page_validity, Some(remaining), values, - page_values + &mut page_values .by_ref() .map(|x| x.unwrap().as_()) .map(self.0.op), - ) + )? }, State::FilteredDeltaBinaryPackedRequired(page) => { values.extend( @@ -175,11 +175,11 @@ where page_validity, Some(remaining), values, - page_values + &mut page_values .by_ref() .map(|x| x.unwrap().as_()) .map(self.0.op), - ); + )?; }, } Ok(()) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs index 23e45bece122..f8a9ea87fbd4 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils.rs @@ -9,7 +9,8 @@ use super::super::PagesIter; use crate::parquet::deserialize::{ FilteredHybridEncoded, FilteredHybridRleDecoderIter, HybridDecoderBitmapIter, HybridEncoded, }; -use crate::parquet::encoding::hybrid_rle; +use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator}; +use crate::parquet::error::ParquetResult; use crate::parquet::indexes::Interval; use crate::parquet::page::{split_buffer, DataPage, DictPage, Page}; use crate::parquet::schema::Repetition; @@ -33,6 +34,91 @@ pub(super) trait PageValidity<'a> { fn next_limited(&mut self, limit: usize) -> Option>; } +pub trait BatchableCollector { + fn reserve(target: &mut T, n: usize); + fn push_n(&mut self, target: &mut T, n: usize) -> ParquetResult<()>; + fn push_n_nulls(&mut self, target: &mut T, n: usize) -> ParquetResult<()>; + fn skip_n(&mut self, n: usize) -> ParquetResult<()>; +} + +/// This batches sequential collect operations to try and prevent unnecessary buffering and +/// `Iterator::next` polling. +#[must_use] +pub struct BatchedCollector<'a, I, T, C: BatchableCollector> { + num_waiting_valids: usize, + num_waiting_invalids: usize, + + target: &'a mut T, + collector: C, + _pd: std::marker::PhantomData, +} + +impl<'a, I, T, C: BatchableCollector> BatchedCollector<'a, I, T, C> { + pub fn new(collector: C, target: &'a mut T) -> Self { + Self { + num_waiting_valids: 0, + num_waiting_invalids: 0, + target, + collector, + _pd: Default::default(), + } + } + + #[inline] + pub fn push_n_valids(&mut self, n: usize) -> ParquetResult<()> { + if self.num_waiting_invalids == 0 { + self.num_waiting_valids += n; + return Ok(()); + } + + self.collector + .push_n(self.target, self.num_waiting_valids)?; + self.collector + .push_n_nulls(self.target, self.num_waiting_invalids)?; + + self.num_waiting_valids = n; + self.num_waiting_invalids = 0; + + Ok(()) + } + + #[inline] + pub fn push_n_invalids(&mut self, n: usize) { + self.num_waiting_invalids += n; + } + + #[inline] + pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + if n == 0 { + return Ok(()); + } + + if self.num_waiting_valids > 0 { + self.collector + .push_n(self.target, self.num_waiting_valids)?; + } + if self.num_waiting_invalids > 0 { + self.collector + .push_n_nulls(self.target, self.num_waiting_invalids)?; + } + self.collector.skip_n(n)?; + + self.num_waiting_valids = 0; + self.num_waiting_invalids = 0; + + Ok(()) + } + + #[inline] + pub fn finalize(mut self) -> ParquetResult<()> { + self.collector + .push_n(self.target, self.num_waiting_valids)?; + self.collector + .push_n_nulls(self.target, self.num_waiting_invalids)?; + Ok(()) + } +} + #[derive(Debug, Clone)] pub struct FilteredOptionalPageValidity<'a> { iter: FilteredHybridRleDecoderIter<'a>, @@ -121,33 +207,6 @@ impl<'a> PageValidity<'a> for FilteredOptionalPageValidity<'a> { } } -pub struct Zip { - validity: V, - values: I, -} - -impl Zip { - pub fn new(validity: V, values: I) -> Self { - Self { validity, values } - } -} - -impl, I: Iterator> Iterator for Zip { - type Item = Option; - - #[inline] - fn next(&mut self) -> Option { - self.validity - .next() - .map(|x| if x { self.values.next() } else { None }) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.validity.size_hint() - } -} - #[derive(Debug, Clone)] pub struct OptionalPageValidity<'a> { iter: HybridDecoderBitmapIter<'a>, @@ -227,11 +286,11 @@ impl<'a> PageValidity<'a> for OptionalPageValidity<'a> { } } -fn reserve_pushable_and_validity<'a, T, P: Pushable>( +fn reserve_pushable_and_validity<'a, I, T, C: BatchableCollector>( validity: &mut MutableBitmap, page_validity: &'a mut dyn PageValidity, limit: Option, - pushable: &mut P, + target: &mut T, ) -> Vec> { let limit = limit.unwrap_or(usize::MAX); @@ -257,20 +316,22 @@ fn reserve_pushable_and_validity<'a, T, P: Pushable>( }; runs.push(run) } - pushable.reserve(reserve_pushable); + C::reserve(target, reserve_pushable); validity.reserve(reserve_pushable); runs } /// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder -pub(super) fn extend_from_decoder, I: Iterator>( +pub(super) fn extend_from_decoder>( validity: &mut MutableBitmap, page_validity: &mut dyn PageValidity, limit: Option, - pushable: &mut P, - mut values_iter: I, -) { - let runs = reserve_pushable_and_validity(validity, page_validity, limit, pushable); + target: &mut T, + collector: C, +) -> ParquetResult<()> { + let runs = reserve_pushable_and_validity::(validity, page_validity, limit, target); + + let mut batched_collector = BatchedCollector::new(collector, target); // then a second loop to really fill the buffers for run in runs { @@ -281,31 +342,122 @@ pub(super) fn extend_from_decoder, I: Iterator>( length, } => { // consume `length` items - let iter = BitmapIter::new(values, offset, length); - let iter = Zip::new(iter, &mut values_iter); - - for item in iter { - if let Some(item) = item { - pushable.push(item) - } else { - pushable.push_null() - } + let mut validity_iter = BitmapIter::new(values, offset, length); + + let mut bit_sum = 0; + while validity_iter.num_remaining() != 0 { + let num_valid = validity_iter.take_leading_ones(); + bit_sum += num_valid; + batched_collector.push_n_valids(num_valid)?; + + let num_invalid = validity_iter.take_leading_zeros(); + bit_sum += num_invalid; + batched_collector.push_n_invalids(num_invalid); } + + debug_assert_eq!(bit_sum, length); + validity.extend_from_slice(values, offset, length); }, FilteredHybridEncoded::Repeated { is_set, length } => { validity.extend_constant(length, is_set); if is_set { - for v in (&mut values_iter).take(length) { - pushable.push(v) - } + batched_collector.push_n_valids(length)?; } else { - pushable.extend_null_constant(length); + batched_collector.push_n_invalids(length); } }, - FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {}, + FilteredHybridEncoded::Skipped(valids) => batched_collector.skip_in_place(valids)?, }; } + + batched_collector.finalize()?; + + Ok(()) +} + +/// This translates and collects items from a [`HybridRleDecoder`] into a target [`Vec`]. +/// +/// This batches sequential collect operations to try and prevent unnecessary buffering. +pub struct TranslatedHybridRle<'a, 'b, 'c, O, T> +where + O: Clone + Default, + T: Translator, +{ + decoder: &'a mut HybridRleDecoder<'b>, + translator: &'c T, + _pd: std::marker::PhantomData, +} + +impl<'a, 'b, 'c, O, T> TranslatedHybridRle<'a, 'b, 'c, O, T> +where + O: Clone + Default, + T: Translator, +{ + pub fn new(decoder: &'a mut HybridRleDecoder<'b>, translator: &'c T) -> Self { + Self { + decoder, + translator, + _pd: Default::default(), + } + } +} + +impl<'a, 'b, 'c, O, T> BatchableCollector> for TranslatedHybridRle<'a, 'b, 'c, O, T> +where + O: Clone + Default, + T: Translator, +{ + #[inline] + fn reserve(target: &mut Vec, n: usize) { + target.reserve(n); + } + + #[inline] + fn push_n(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + self.decoder + .translate_and_collect_n_into(target, n, self.translator) + } + + #[inline] + fn push_n_nulls(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + target.resize(target.len() + n, O::default()); + Ok(()) + } + + #[inline] + fn skip_n(&mut self, n: usize) -> ParquetResult<()> { + self.decoder.skip_in_place(n) + } +} + +impl, I: Iterator> BatchableCollector for I { + #[inline] + fn reserve(target: &mut P, n: usize) { + target.reserve(n); + } + + #[inline] + fn push_n(&mut self, target: &mut P, n: usize) -> ParquetResult<()> { + target.extend_n(n, self); + Ok(()) + } + + #[inline] + fn push_n_nulls(&mut self, target: &mut P, n: usize) -> ParquetResult<()> { + target.extend_null_constant(n); + Ok(()) + } + + #[inline] + fn skip_n(&mut self, n: usize) -> ParquetResult<()> { + if n == 0 { + return Ok(()); + } + + _ = self.nth(n); + Ok(()) + } } /// The state of a partially deserialized page diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs index d3361f0f44c0..a85616a6c300 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs @@ -89,7 +89,97 @@ impl<'a, T: Unpackable> Decoder<'a, T> { } } +/// A iterator over the exact chunks in a [`Decoder`]. +/// +/// The remainder can be accessed using `remainder` or `next_inexact`. +pub struct ChunkedDecoder<'a, 'b, T: Unpackable> { + pub(crate) decoder: &'b mut Decoder<'a, T>, +} + +impl<'a, 'b, T: Unpackable> Iterator for ChunkedDecoder<'a, 'b, T> { + type Item = T::Unpacked; + + #[inline] + fn next(&mut self) -> Option { + if self.decoder.len() < T::Unpacked::LENGTH { + return None; + } + + let mut unpacked = T::Unpacked::zero(); + let packed = self.decoder.packed.next()?; + decode_pack::(packed, self.decoder.num_bits, &mut unpacked); + self.decoder.length -= T::Unpacked::LENGTH; + Some(unpacked) + } + + fn size_hint(&self) -> (usize, Option) { + let is_exact = self.decoder.len() % T::Unpacked::LENGTH == 0; + let (low, high) = self.decoder.packed.size_hint(); + + let delta = usize::from(!is_exact); + + (low - delta, high.map(|h| h - delta)) + } +} + +impl<'a, 'b, T: Unpackable> ExactSizeIterator for ChunkedDecoder<'a, 'b, T> {} + +impl<'a, 'b, T: Unpackable> ChunkedDecoder<'a, 'b, T> { + /// Get and consume the remainder chunk if it exists + pub fn remainder(&mut self) -> Option<(T::Unpacked, usize)> { + let remainder_len = self.decoder.len() % T::Unpacked::LENGTH; + + if remainder_len > 0 { + let mut unpacked = T::Unpacked::zero(); + let packed = self.decoder.packed.next_back().unwrap(); + decode_pack::(packed, self.decoder.num_bits, &mut unpacked); + self.decoder.length -= remainder_len; + return Some((unpacked, remainder_len)); + } + + None + } + + /// Get the next (possibly partial) chunk and its filled length + pub fn next_inexact(&mut self) -> Option<(T::Unpacked, usize)> { + if self.decoder.len() >= T::Unpacked::LENGTH { + Some((self.next().unwrap(), T::Unpacked::LENGTH)) + } else { + self.remainder() + } + } +} + impl<'a, T: Unpackable> Decoder<'a, T> { + pub fn chunked<'b>(&'b mut self) -> ChunkedDecoder<'a, 'b, T> { + ChunkedDecoder { decoder: self } + } + + pub fn len(&self) -> usize { + self.length + } + + pub fn skip_chunks(&mut self, n: usize) { + for _ in (&mut self.packed).take(n) {} + } + + pub fn take(&mut self) -> Self { + let block_size = std::mem::size_of::() * self.num_bits; + let packed = std::mem::replace(&mut self.packed, [].chunks(block_size)); + let length = self.length; + self.length = 0; + + debug_assert_eq!(self.len(), 0); + + Self { + packed, + num_bits: self.num_bits, + length, + _pd: Default::default(), + } + } + + #[inline] pub fn collect_into(mut self, vec: &mut Vec) { // @NOTE: // When microbenchmarking changing this from a element-wise iterator to a collect into diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs new file mode 100644 index 000000000000..6c40d4c27720 --- /dev/null +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs @@ -0,0 +1,267 @@ +use super::Translator; +use crate::parquet::encoding::bitpacked::{self, Unpackable, Unpacked}; +use crate::parquet::error::ParquetResult; + +#[derive(Debug, Clone)] +pub struct BufferedBitpacked<'a> { + pub unpacked: [u32; 32], + pub unpacked_start: usize, + pub unpacked_end: usize, + + pub decoder: bitpacked::Decoder<'a, u32>, +} + +#[derive(Debug, Clone)] +pub struct BufferedRle { + pub value: u32, + pub length: usize, +} + +/// A buffered set of items for the [`HybridRleDecoder`]. This can be iterated over and stopped at +/// any time. +#[derive(Debug, Clone)] +pub enum HybridRleBuffered<'a> { + Bitpacked(BufferedBitpacked<'a>), + Rle(BufferedRle), +} + +impl Iterator for BufferedRle { + type Item = u32; + + fn next(&mut self) -> Option { + if self.length > 0 { + self.length -= 1; + Some(self.value) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.length, Some(self.length)) + } +} + +impl ExactSizeIterator for BufferedRle {} + +impl<'a> Iterator for BufferedBitpacked<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + if self.unpacked_start < self.unpacked_end { + let value = self.unpacked[self.unpacked_start]; + self.unpacked_start += 1; + return Some(value); + } + + self.decoder + .chunked() + .next_inexact() + .map(|(unpacked, unpacked_length)| { + debug_assert!(unpacked_length > 0); + let value = unpacked[0]; + self.unpacked = unpacked; + self.unpacked_end = unpacked_length; + self.unpacked_start = 1; + value + }) + } + + fn size_hint(&self) -> (usize, Option) { + let unpacked_num_elements = self.unpacked_end - self.unpacked_start; + let exact = unpacked_num_elements + self.decoder.len(); + (exact, Some(exact)) + } +} + +impl<'a> ExactSizeIterator for BufferedBitpacked<'a> {} + +impl<'a> Iterator for HybridRleBuffered<'a> { + type Item = u32; + + fn next(&mut self) -> Option { + match self { + HybridRleBuffered::Bitpacked(b) => b.next(), + HybridRleBuffered::Rle(b) => b.next(), + } + } + + fn size_hint(&self) -> (usize, Option) { + match self { + HybridRleBuffered::Bitpacked(b) => b.size_hint(), + HybridRleBuffered::Rle(b) => b.size_hint(), + } + } +} + +impl<'a> ExactSizeIterator for HybridRleBuffered<'a> {} + +impl<'a> BufferedBitpacked<'a> { + fn translate_and_collect_limited_into( + &mut self, + target: &mut Vec, + limit: usize, + translator: &impl Translator, + ) -> ParquetResult { + let unpacked_num_elements = self.unpacked_end - self.unpacked_start; + if limit <= unpacked_num_elements { + translator.translate_slice( + target, + &self.unpacked[self.unpacked_start..self.unpacked_start + limit], + )?; + self.unpacked_start += limit; + return Ok(limit); + } + + translator.translate_slice( + target, + &self.unpacked[self.unpacked_start..self.unpacked_end], + )?; + self.unpacked_end = 0; + self.unpacked_start = 0; + let limit = limit - unpacked_num_elements; + + let decoder = self.decoder.take(); + let decoder_len = decoder.len(); + if limit >= decoder_len { + translator.translate_bitpacked_all(target, decoder)?; + Ok(unpacked_num_elements + decoder_len) + } else { + let buffered = translator.translate_bitpacked_limited(target, limit, decoder)?; + *self = buffered; + Ok(unpacked_num_elements + limit) + } + } + + pub fn translate_and_collect_into( + self, + target: &mut Vec, + translator: &impl Translator, + ) -> ParquetResult { + let unpacked_num_elements = self.unpacked_end - self.unpacked_start; + translator.translate_slice( + target, + &self.unpacked[self.unpacked_start..self.unpacked_end], + )?; + let decoder_len = self.decoder.len(); + translator.translate_bitpacked_all(target, self.decoder)?; + Ok(unpacked_num_elements + decoder_len) + } + + pub fn skip_in_place(&mut self, n: usize) -> usize { + let unpacked_num_elements = self.unpacked_end - self.unpacked_start; + + if n < unpacked_num_elements { + self.unpacked_start += n; + return n; + } + + let n = n - unpacked_num_elements; + + if self.decoder.len() > n { + let num_chunks = n / ::Unpacked::LENGTH; + let unpacked_offset = n % ::Unpacked::LENGTH; + self.decoder.skip_chunks(num_chunks); + let (unpacked, unpacked_length) = self.decoder.chunked().next_inexact().unwrap(); + + self.unpacked = unpacked; + self.unpacked_start = unpacked_offset; + self.unpacked_end = unpacked_length; + + return unpacked_num_elements + n; + } + + self.decoder.len() + unpacked_num_elements + } +} + +impl BufferedRle { + pub fn translate_and_collect_limited_into( + &mut self, + target: &mut Vec, + limit: usize, + translator: &impl Translator, + ) -> ParquetResult { + let value = translator.translate(self.value)?; + let num_elements = usize::min(self.length, limit); + self.length -= num_elements; + target.resize(target.len() + num_elements, value); + Ok(num_elements) + } + + pub fn translate_and_collect_into( + self, + target: &mut Vec, + translator: &impl Translator, + ) -> ParquetResult { + let value = translator.translate(self.value)?; + target.resize(target.len() + self.length, value); + Ok(self.length) + } + + pub fn skip_in_place(&mut self, n: usize) -> usize { + let num_elements = usize::min(self.length, n); + self.length -= num_elements; + num_elements + } +} + +impl<'a> HybridRleBuffered<'a> { + pub fn translate_and_collect_limited_into( + &mut self, + target: &mut Vec, + limit: usize, + translator: &impl Translator, + ) -> ParquetResult { + let start_target_length = target.len(); + let start_length = self.len(); + + let num_processed = match self { + HybridRleBuffered::Bitpacked(b) => { + b.translate_and_collect_limited_into(target, limit, translator) + }, + HybridRleBuffered::Rle(b) => { + b.translate_and_collect_limited_into(target, limit, translator) + }, + }?; + + debug_assert!(num_processed <= limit); + debug_assert_eq!(num_processed, target.len() - start_target_length); + debug_assert_eq!(num_processed, start_length - self.len()); + + Ok(num_processed) + } + + pub fn translate_and_collect_into( + self, + target: &mut Vec, + translator: &impl Translator, + ) -> ParquetResult { + let start_target_length = target.len(); + let start_length = self.len(); + + let num_processed = match self { + HybridRleBuffered::Bitpacked(b) => b.translate_and_collect_into(target, translator), + HybridRleBuffered::Rle(b) => b.translate_and_collect_into(target, translator), + }?; + + debug_assert_eq!(num_processed, target.len() - start_target_length); + debug_assert_eq!(num_processed, start_length); + + Ok(num_processed) + } + + pub fn skip_in_place(&mut self, n: usize) -> usize { + let start_length = self.len(); + + let num_skipped = match self { + HybridRleBuffered::Bitpacked(b) => b.skip_in_place(n), + HybridRleBuffered::Rle(b) => b.skip_in_place(n), + }; + + debug_assert!(num_skipped <= n); + debug_assert_eq!(num_skipped, start_length - self.len()); + + num_skipped + } +} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs new file mode 100644 index 000000000000..1e2c42d985f5 --- /dev/null +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs @@ -0,0 +1,359 @@ +/// Since the HybridRle decoder is very widely used within the Parquet reader and the code is quite +/// complex to facilitate performance. We create this small fuzzer +use std::collections::VecDeque; + +use rand::Rng; + +use super::*; + +fn run_iteration( + bs: &[u32], + collects: impl Iterator, + encoded: &mut Vec, + decoded: &mut Vec, + num_bits: u32, +) -> ParquetResult<()> { + encoded.clear(); + decoded.clear(); + + encoder::encode(encoded, bs.iter().copied(), num_bits).unwrap(); + + let mut decoder = HybridRleDecoder::new(&encoded[..], num_bits, bs.len()); + + for c in collects { + decoder.collect_n_into(decoded, c)?; + } + + Ok(()) +} + +/// Minimizes a failing case +fn minimize_failing_case( + bs: &mut Vec, + collects: &mut VecDeque, + encoded: &mut Vec, + decoded: &mut Vec, + num_bits: u32, +) -> ParquetResult<()> { + loop { + let initial_bs_len = bs.len(); + let initial_collects_len = collects.len(); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + assert_ne!(&bs, &decoded); + + while collects.len() > 2 { + let last = collects.pop_back().unwrap(); + + *collects.back_mut().unwrap() += last; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + *collects.back_mut().unwrap() -= last; + collects.push_back(last); + break; + } + } + + while collects.len() > 2 { + let first = collects.pop_front().unwrap(); + + *collects.front_mut().unwrap() += first; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + *collects.front_mut().unwrap() -= first; + collects.push_front(first); + break; + } + } + + while bs.len() > 1 { + let last = bs.pop().unwrap(); + *collects.back_mut().unwrap() -= 1; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.push(last); + *collects.back_mut().unwrap() += 1; + break; + } + + if *collects.back().unwrap() == 0 { + collects.pop_back().unwrap(); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.push_back(0); + break; + } + } + } + + while bs.len() > 1 { + let last = bs.pop().unwrap(); + *collects.front_mut().unwrap() -= 1; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.push(last); + *collects.front_mut().unwrap() += 1; + break; + } + + if *collects.front().unwrap() == 0 { + collects.pop_front().unwrap(); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.push_front(0); + break; + } + } + } + + while bs.len() > 1 { + let first = bs.remove(0); + *collects.back_mut().unwrap() -= 1; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.insert(0, first); + *collects.back_mut().unwrap() += 1; + break; + } + + if *collects.back().unwrap() == 0 { + collects.pop_back().unwrap(); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.push_back(0); + break; + } + } + } + + while bs.len() > 1 { + let first = bs.remove(0); + *collects.front_mut().unwrap() -= 1; + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.insert(0, first); + *collects.front_mut().unwrap() += 1; + break; + } + + if *collects.front().unwrap() == 0 { + collects.pop_front().unwrap(); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.push_front(0); + break; + } + } + } + + let mut start_offset = collects[0]; + for i in 1..collects.len() - 1 { + loop { + let start_length = collects[i]; + + while collects[i] > 0 { + collects[i] -= 1; + let item = bs.remove(start_offset); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.insert(start_offset, item); + collects[i] += 1; + break; + } + + if collects[i] == 0 { + collects.remove(i); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.insert(i, 0); + break; + } + } + } + + while collects[i] > 0 { + collects[i] -= 1; + let end_offset = start_offset + collects[i] - 1; + let item = bs.remove(end_offset); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + bs.insert(end_offset, item); + collects[i] += 1; + break; + } + + if collects[i] == 0 { + collects.remove(i); + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + if bs == decoded { + collects.insert(i, 0); + break; + } + } + } + + if collects[i] == start_length { + break; + } + } + + start_offset += collects[i]; + } + + let now_bs_len = bs.len(); + let now_collects_len = collects.len(); + + if initial_bs_len == now_bs_len && initial_collects_len == now_collects_len { + break; + } + } + + run_iteration(bs, collects.iter().copied(), encoded, decoded, num_bits)?; + + Ok(()) +} + +fn fuzz_loops(num_loops: usize) -> ParquetResult<()> { + let mut rng = rand::thread_rng(); + + const MAX_LENGTH: usize = 10_000; + + let mut encoded = Vec::with_capacity(1024); + let mut decoded = Vec::with_capacity(1024); + + let mut bs = Vec::with_capacity(MAX_LENGTH); + let mut collects: VecDeque = VecDeque::with_capacity(2000); + + for i in 0..num_loops { + collects.clear(); + bs.clear(); + + let num_bits = rng.gen_range(0..=32); + let mask = 1u32.wrapping_shl(num_bits).wrapping_sub(1); + + let length = rng.gen_range(1..=MAX_LENGTH); + + unsafe { bs.set_len(length) }; + rng.fill(&mut bs[..]); + + let mut filled = 0; + while filled < bs.len() { + if rng.gen() { + let num_repeats = rng.gen_range(0..=(bs.len() - filled)); + let value = bs[filled] & mask; + for j in 0..num_repeats { + bs[filled + j] = value; + } + filled += num_repeats; + } else { + bs[filled] &= mask; + filled += 1; + } + } + + if rng.gen() { + let mut num_values = bs.len(); + while num_values > 0 { + let n = rng.gen_range(0..=num_values); + collects.push_back(n); + num_values -= n; + } + } else { + collects.resize(1, bs.len()); + } + + run_iteration( + &bs, + collects.iter().copied(), + &mut encoded, + &mut decoded, + num_bits, + )?; + + if decoded != bs { + minimize_failing_case(&mut bs, &mut collects, &mut encoded, &mut decoded, num_bits)?; + + eprintln!("Minimized case:"); + eprintln!("Expected: {bs:?}"); + eprintln!("Found: {decoded:?}"); + eprintln!("Collects: {collects:?}"); + eprintln!(); + + panic!("Found a failing case..."); + } + + if i % 512 == 0 { + eprintln!("{i} iterations done."); + } + } + + Ok(()) +} + +#[test] +fn small_fuzz() -> ParquetResult<()> { + fuzz_loops(2048) +} + +#[test] +#[ignore = "Large fuzz test. Too slow"] +fn large_fuzz() -> ParquetResult<()> { + fuzz_loops(1_000_000) +} + +#[test] +fn found_cases() -> ParquetResult<()> { + let mut encoded = Vec::with_capacity(1024); + let mut decoded = Vec::with_capacity(1024); + + let num_bits = 7; + + let bs: [u32; 1024] = std::array::from_fn(|i| (i / 10) as u32); + + encoder::encode(&mut encoded, bs.iter().copied(), num_bits).unwrap(); + let mut decoder = HybridRleDecoder::new(&encoded[..], num_bits, bs.len()); + + while decoder.len() != 0 { + let n = decoder.next().unwrap(); + decoded.push(n); + } + + for _ in 0..1 { + _ = decoder.next(); + } + + decoder.get_result()?; + + assert_eq!(&decoded, &bs); + + Ok(()) +} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 149fa3ceaa4e..6dbc22f857ba 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -1,15 +1,25 @@ // See https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3 mod bitmap; +mod buffered; mod decoder; mod encoder; +mod translator; + +#[cfg(test)] +mod fuzz; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; +pub use buffered::BufferedBitpacked; pub use decoder::Decoder; pub use encoder::encode; use polars_utils::iter::FallibleIterator; use polars_utils::slice::GetSaferUnchecked; +pub use translator::{DictionaryTranslator, Translator, UnitTranslator}; +use self::buffered::HybridRleBuffered; use super::{bitpacked, ceil8, uleb128}; +use crate::parquet::encoding::bitpacked::{Unpackable, Unpacked}; +use crate::parquet::encoding::hybrid_rle::buffered::BufferedRle; use crate::parquet::error::{ParquetError, ParquetResult}; /// The two possible states of an RLE-encoded run. @@ -22,95 +32,80 @@ pub enum HybridEncoded<'a> { Rle(&'a [u8], usize), } -/// A decoder for Hybrid-RLE encoded values +/// A [`Iterator`] for Hybrid Run-Length Encoding +/// +/// The hybrid here means that each second is prepended by a bit that differentiates between two +/// modes. +/// +/// 1. Run-Length Encoding in the shape of `[Number of Values, Value]` +/// 2. Bitpacking in the shape of `[Value 1 in n bits, Value 2 in n bits, ...]` +/// +/// Note, that this can iterate, but the set of `collect_*` and `translate_and_collect_*` methods +/// should be highly preferred as they are way more efficient and have better error handling. #[derive(Debug, Clone)] pub struct HybridRleDecoder<'a> { data: &'a [u8], num_bits: usize, num_values: usize, -} -/// A buffered [`Iterator`] of Hybrid-RLE encoded values -#[derive(Debug, Clone)] -pub struct BufferedHybridRleDecoderIter<'a> { - decoder: HybridRleDecoder<'a>, - - buffer: Vec, - buffer_index: usize, + buffered: Option>, + /// The result after iterating. + /// + /// This is only needed because we iterate over individual elements. result: Option, } -impl<'a> BufferedHybridRleDecoderIter<'a> { - // @NOTE: - // These were not taken with too much thought to be honest. It might be better to increase - // these because it allows for more buffering at the cost of utilizing more memory. - const BASE_CAPACITY: usize = 128; - const STOP_AT_SIZE: usize = 64; +impl<'a> FallibleIterator for HybridRleDecoder<'a> { + fn get_result(&mut self) -> Result<(), ParquetError> { + match self.result.take() { + None => Ok(()), + Some(err) => Err(err), + } + } } -impl<'a> Iterator for BufferedHybridRleDecoderIter<'a> { +impl<'a> Iterator for HybridRleDecoder<'a> { type Item = u32; fn next(&mut self) -> Option { - if self.buffer_index < self.buffer.len() { - let value = self.buffer[self.buffer_index]; - self.buffer_index += 1; - return Some(value); - } - - if self.decoder.num_values == 0 { + if self.num_values == 0 { return None; } - if self.decoder.num_bits == 0 { - self.decoder.num_values -= 1; + if self.num_bits == 0 { + self.num_values -= 1; return Some(0); } - self.buffer.clear(); - self.buffer_index = 1; - while self.buffer.len() < Self::STOP_AT_SIZE && self.decoder.num_values > 0 { - let result = self.decoder.collect_once(&mut self.buffer); - if let Err(err) = result { - self.result = Some(err); - return None; + if let Some(buffered) = self.buffered.as_mut() { + match buffered.next() { + None => self.buffered = None, + Some(value) => { + self.num_values -= 1; + return Some(value); + }, } } - self.buffer.first().copied() - } - - fn size_hint(&self) -> (usize, Option) { - let size = self.decoder.num_values + self.buffer.len() - self.buffer_index; - (size, Some(size)) - } -} + let mut buffer = Vec::with_capacity(1); + let result = self.translate_and_collect_limited_once(&mut buffer, Some(1), &UnitTranslator); -impl<'a> FallibleIterator for BufferedHybridRleDecoderIter<'a> { - fn get_result(&mut self) -> Result<(), ParquetError> { - match self.result.take() { - None => Ok(()), - Some(err) => Err(err), + match result { + Ok(_) => Some(buffer[0]), + Err(err) => { + self.result = Some(err); + None + }, } } -} -impl<'a> ExactSizeIterator for BufferedHybridRleDecoderIter<'a> {} - -impl<'a> IntoIterator for HybridRleDecoder<'a> { - type Item = u32; - type IntoIter = BufferedHybridRleDecoderIter<'a>; - - fn into_iter(self) -> Self::IntoIter { - BufferedHybridRleDecoderIter { - decoder: self, - buffer: Vec::with_capacity(BufferedHybridRleDecoderIter::BASE_CAPACITY), - buffer_index: 0, - result: None, - } + fn size_hint(&self) -> (usize, Option) { + (self.num_values, Some(self.num_values)) } } +impl<'a> ExactSizeIterator for HybridRleDecoder<'a> {} + impl<'a> HybridRleDecoder<'a> { /// Returns a new [`HybridRleDecoder`] pub fn new(data: &'a [u8], num_bits: u32, num_values: usize) -> Self { @@ -118,47 +113,49 @@ impl<'a> HybridRleDecoder<'a> { data, num_bits: num_bits as usize, num_values, - } - } - pub fn iter(&self) -> BufferedHybridRleDecoderIter<'a> { - BufferedHybridRleDecoderIter { - decoder: self.clone(), - buffer: Vec::with_capacity(BufferedHybridRleDecoderIter::BASE_CAPACITY), - buffer_index: 0, + buffered: None, result: None, } } - fn collect_once(&mut self, vec: &mut Vec) -> ParquetResult<()> { + /// Translate and collect at most `limit` items into `target`. + /// + /// This function expects `num_values > 0` and `num_bits > 0`. + fn translate_and_collect_limited_once( + &mut self, + target: &mut Vec, + limit: Option, + translator: &impl Translator, + ) -> ParquetResult { + if limit == Some(0) { + return Ok(0); + } + + let start_target_length = target.len(); + let start_num_values = self.num_values; + // @NOTE: // This is basically a collapsed version of the `decoder::Decoder`. Any change here // probably also applies there. In a microbenchmark this collapse did around 3x for this // specific piece of code, but I think this actually also makes the code more readable. - debug_assert!(self.num_values > 0); + debug_assert!(self.num_values > 0, "{:?}", target.len()); debug_assert!(self.num_bits > 0); let (indicator, consumed) = uleb128::decode(self.data); self.data = unsafe { self.data.get_unchecked_release(consumed..) }; if consumed == 0 { - // We don't step everything at once because that might allocate a lot at once. So, we - // do it in steps. This reasoning might not hold up 100% for just HybridRleDecoder but - // it does for BufferedHybridRleDecoderIter. - // - // @TODO: There might be a better solution for this. - - const MAX_STEP_SIZE: usize = 64; - - let step_size = usize::min(self.num_values, MAX_STEP_SIZE); - vec.resize(vec.len() + step_size, 0); + let step_size = + limit.map_or(self.num_values, |limit| usize::min(self.num_values, limit)); + target.resize(target.len() + step_size, translator.translate(0)?); self.num_values -= step_size; - return Ok(()); + return Ok(step_size); } - if indicator & 1 == 1 { + let num_processed = if indicator & 1 == 1 { // is bitpacking let bytes = (indicator as usize >> 1) * self.num_bits; let bytes = std::cmp::min(bytes, self.data.len()); @@ -167,8 +164,13 @@ impl<'a> HybridRleDecoder<'a> { let length = std::cmp::min(packed.len() * 8 / self.num_bits, self.num_values); let decoder = bitpacked::Decoder::::try_new(packed, self.num_bits, length)?; - decoder.collect_into(vec); - self.num_values -= length; + + let (num_processed, buffered) = + translator.translate_bitpacked_decoder(decoder, target, limit)?; + debug_assert!(limit.map_or(true, |limit| limit >= num_processed)); + self.buffered = buffered; + + num_processed } else { // is rle let run_length = indicator as usize >> 1; @@ -177,51 +179,248 @@ impl<'a> HybridRleDecoder<'a> { let (pack, remaining) = self.data.split_at(rle_bytes); self.data = remaining; - let mut bytes = [0u8; std::mem::size_of::()]; - pack.iter().zip(bytes.iter_mut()).for_each(|(src, dst)| { - *dst = *src; - }); - let value = u32::from_le_bytes(bytes); - vec.resize(vec.len() + run_length, value); - self.num_values -= run_length; - } + if run_length == 0 { + 0 + } else { + let mut bytes = [0u8; std::mem::size_of::()]; + pack.iter().zip(bytes.iter_mut()).for_each(|(src, dst)| { + *dst = *src; + }); + let value = u32::from_le_bytes(bytes); + + let num_elements = limit.map_or(run_length, |limit| usize::min(run_length, limit)); + + // Only translate once. Then, just do a memset. + let translated = translator.translate(value)?; + target.resize(target.len() + num_elements, translated); + + if let Some(limit) = limit { + if run_length > limit { + self.buffered = (run_length != limit).then_some({ + HybridRleBuffered::Rle(BufferedRle { + value, + length: run_length - num_elements, + }) + }); + } + } + + num_elements + } + }; - Ok(()) + self.num_values -= num_processed; + + debug_assert_eq!(num_processed, start_num_values - self.num_values); + debug_assert_eq!(num_processed, target.len() - start_target_length); + debug_assert!(limit.map_or(true, |limit| num_processed <= limit)); + + Ok(num_processed) } - #[inline] - pub fn collect_into(mut self, vec: &mut Vec) -> ParquetResult<()> { - // @NOTE: - // When microbenchmarking, this performs around 2x better than using an element-wise - // iterator. + pub fn translate_and_collect_into( + mut self, + target: &mut Vec, + translator: &impl Translator, + ) -> Result<(), ParquetError> { if self.num_values == 0 { return Ok(()); } if self.num_bits == 0 { - vec.resize(vec.len() + self.num_values, 0); + target.resize(target.len() + self.num_values, translator.translate(0)?); return Ok(()); } - vec.reserve(self.num_values); + target.reserve(self.num_values); + if let Some(buffered) = self.buffered.take() { + let num_buffered = buffered.translate_and_collect_into(target, translator)?; + self.num_values -= num_buffered; + } while self.num_values > 0 { - self.collect_once(vec)?; + self.translate_and_collect_limited_once(target, None, translator)?; } Ok(()) } + pub fn translate_and_collect_n_into( + &mut self, + target: &mut Vec, + n: usize, + translator: &impl Translator, + ) -> ParquetResult<()> { + if self.num_values == 0 || n == 0 { + return Ok(()); + } + + if self.num_bits == 0 { + target.resize(target.len() + n, translator.translate(0)?); + self.num_values -= n; + return Ok(()); + } + + let target_length = target.len() + n; + target.reserve(n); + + if let Some(buffered) = self.buffered.as_mut() { + let num_buffered = + buffered.translate_and_collect_limited_into(target, n, translator)?; + debug_assert!(num_buffered <= n); + self.num_values -= num_buffered; + + if num_buffered < n { + self.buffered = None; + } + } + + while target.len() < target_length && self.num_values > 0 { + self.translate_and_collect_limited_once( + target, + Some(target_length - target.len()), + translator, + )?; + } + + Ok(()) + } + + pub fn translate_and_collect( + self, + translator: &impl Translator, + ) -> ParquetResult> { + let mut vec = Vec::new(); + self.translate_and_collect_into(&mut vec, translator)?; + Ok(vec) + } + + pub fn collect_into(self, target: &mut Vec) -> Result<(), ParquetError> { + self.translate_and_collect_into(target, &UnitTranslator) + } + + pub fn collect_n_into(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + self.translate_and_collect_n_into(target, n, &UnitTranslator) + } + pub fn collect(self) -> ParquetResult> { let mut vec = Vec::new(); self.collect_into(&mut vec)?; Ok(vec) } + + pub fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + if self.num_values == 0 || n == 0 { + return Ok(()); + } + + if n >= self.num_values { + self.data = &[]; + self.num_values = 0; + self.buffered = None; + return Ok(()); + } + + if self.num_bits == 0 { + self.num_values -= n; + return Ok(()); + } + + let mut n = n; + if let Some(buffered) = self.buffered.as_mut() { + let num_skipped = buffered.skip_in_place(n); + + if num_skipped < n { + self.buffered = None; + } + + self.num_values -= num_skipped; + n -= num_skipped; + } + + while n > 0 && self.num_values > 0 { + let start_num_values = self.num_values; + + let (indicator, consumed) = uleb128::decode(self.data); + self.data = unsafe { self.data.get_unchecked_release(consumed..) }; + + let num_skipped = if consumed == 0 { + n + } else if indicator & 1 == 1 { + // is bitpacking + let bytes = (indicator as usize >> 1) * self.num_bits; + let bytes = std::cmp::min(bytes, self.data.len()); + let (packed, remaining) = self.data.split_at(bytes); + self.data = remaining; + + let length = std::cmp::min(packed.len() * 8 / self.num_bits, self.num_values); + let mut decoder = + bitpacked::Decoder::::try_new(packed, self.num_bits, length)?; + + // Skip the whole decoder if it is possible + if decoder.len() <= n { + decoder.len() + } else { + const CHUNK_SIZE: usize = ::Unpacked::LENGTH; + + let num_full_chunks = n / CHUNK_SIZE; + decoder.skip_chunks(num_full_chunks); + + let (unpacked, unpacked_length) = decoder.chunked().next_inexact().unwrap(); + let unpacked_offset = n % CHUNK_SIZE; + debug_assert!(unpacked_offset < unpacked_length); + + self.buffered = Some(HybridRleBuffered::Bitpacked(BufferedBitpacked { + unpacked, + + unpacked_start: unpacked_offset, + unpacked_end: unpacked_length, + decoder, + })); + + n + } + } else { + // is rle + let run_length = indicator as usize >> 1; + // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) + let rle_bytes = ceil8(self.num_bits); + let (pack, remaining) = self.data.split_at(rle_bytes); + self.data = remaining; + + // Skip the whole run-length encoded value if it is possible + if run_length <= n { + run_length + } else { + let mut bytes = [0u8; std::mem::size_of::()]; + pack.iter().zip(bytes.iter_mut()).for_each(|(src, dst)| { + *dst = *src; + }); + let value = u32::from_le_bytes(bytes); + + self.buffered = Some(HybridRleBuffered::Rle(BufferedRle { + value, + length: run_length - n, + })); + + n + } + }; + + n -= num_skipped; + self.num_values -= num_skipped; + + debug_assert_eq!(num_skipped, start_num_values - self.num_values); + debug_assert!(num_skipped <= n); + debug_assert!(num_skipped > 0); + } + + Ok(()) + } } #[cfg(test)] mod tests { - use super::*; #[test] @@ -318,10 +517,13 @@ mod tests { let num_bits = 10; let decoder = HybridRleDecoder::new(&data, num_bits, 1000); - let result = decoder.collect()?; + let mut decoder = HybridRleDecoder::new(&data, num_bits, 1000); + let iterator_result: Vec<_> = Iterator::collect(&mut decoder); + assert_eq!(result, (0..1000).collect::>()); + assert_eq!(iterator_result, (0..1000).collect::>()); Ok(()) } diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs new file mode 100644 index 000000000000..a49351072a45 --- /dev/null +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/translator.rs @@ -0,0 +1,249 @@ +use crate::parquet::encoding::bitpacked::{Decoder, Unpackable, Unpacked}; +use crate::parquet::encoding::hybrid_rle::{BufferedBitpacked, HybridRleBuffered}; +use crate::parquet::error::{ParquetError, ParquetResult}; + +/// A trait to describe a translation from a HybridRLE encoding to an another format. +/// +/// In essence, this is one method ([`Translator::translate`]) that maps an `u32` to the desired +/// output type `O`. There are several other methods that may provide optimized routines +/// for slices, chunks and decoders. +/// +/// # Motivation +/// +/// The [`HybridRleDecoder`] is used extensively during Parquet decoding because it is used for +/// Dremel decoding and dictionary decoding. We want to perform a transformation from this +/// space-efficient encoding to a buffer. Here, items might be skipped, might be mapped and only a +/// few items might be needed. There are 3 main ways to do this. +/// +/// 1. Element-by-element translation using iterator `map`, `filter`, `skip`, etc. This suffers +/// from the problem that is difficult to SIMD the translation and that a `collect` might need +/// to constantly poll the `next` function. Next to that monomorphization might need to generate +/// many, many variants. +/// 2. Buffer most everything, filter and translate later. This has high memory-consumption and +/// might suffer from cache-eviction problems. This is computationally the most efficient, but +/// probably still has a high runtime. Also, this fails to utilize run-length information and +/// needs to retranslate all repeated elements. +/// 3. Batched operations. Here, we try to utilize the run-length information and utilize SIMD to +/// process many bitpacked items. This can provide the best of both worlds. +/// +/// The [`HybridRleDecoder`][super::HybridRleDecoder] decoders utilizing both run-length encoding +/// and bitpacking. In both processes, this [`Translator`] trait allows for translation with (i) no +/// heap allocations and (ii) cheap buffering and can stop and start at any point. Consequently, +/// the memory consumption while doing these translations can be relatively low while still +/// processing items in batches. +/// +/// [`HybridRleDecoder`]: super::HybridRleDecoder +pub trait Translator { + /// Translate from a decoded value to the output format + fn translate(&self, value: u32) -> ParquetResult; + + /// Translate from a slice of decoded values to the output format and write them to a `target`. + /// + /// This can overwritten to be more optimized. + fn translate_slice(&self, target: &mut Vec, source: &[u32]) -> ParquetResult<()> { + target.reserve(source.len()); + for v in source { + target.push(self.translate(*v)?); + } + Ok(()) + } + + /// Translate from a chunk of unpacked items to the output format and write them to a `target`. + /// + /// This is the same as [`Translator::translate_slice`] but with a known slice size. This can + /// allow SIMD routines to better optimize the procedure. + /// + /// This can overwritten to be more optimized. + fn translate_chunk( + &self, + target: &mut Vec, + source: &::Unpacked, + ) -> ParquetResult<()> { + self.translate_slice(target, &source[..]) + } + + /// Translate and collect all the items in a [`Decoder`] to a `target`. + /// + /// This can overwritten to be more optimized. + fn translate_bitpacked_all( + &self, + target: &mut Vec, + mut decoder: Decoder, + ) -> ParquetResult<()> { + target.reserve(decoder.len()); + + let mut chunked = decoder.chunked(); + + for unpacked in &mut chunked { + self.translate_chunk(target, &unpacked)?; + } + + if let Some((last, last_length)) = chunked.remainder() { + self.translate_slice(target, &last[..last_length])?; + } + + Ok(()) + } + + /// Translate and collect a limited number of items in a [`Decoder`] to a `target`. + /// + /// This can overwritten to be more optimized. + /// + /// # Panics + /// + /// This method panics when `limit` is larger than the `decoder` length. + fn translate_bitpacked_limited<'a>( + &self, + target: &mut Vec, + limit: usize, + mut decoder: Decoder<'a, u32>, + ) -> ParquetResult> { + assert!(limit < decoder.len()); + + const CHUNK_SIZE: usize = ::Unpacked::LENGTH; + + let mut chunked = decoder.chunked(); + + let num_full_chunks = limit / CHUNK_SIZE; + for unpacked in (&mut chunked).take(num_full_chunks) { + self.translate_chunk(target, &unpacked)?; + } + + let (unpacked, unpacked_length) = chunked.next_inexact().unwrap(); + let unpacked_offset = limit % CHUNK_SIZE; + debug_assert!(unpacked_offset < unpacked_length); + self.translate_slice(target, &unpacked[..unpacked_offset])?; + + Ok(BufferedBitpacked { + unpacked, + + unpacked_start: unpacked_offset, + unpacked_end: unpacked_length, + decoder, + }) + } + + /// Translate and collect items in a [`Decoder`] to a `target`. + /// + /// This can overwritten to be more optimized. + fn translate_bitpacked_decoder<'a>( + &self, + decoder: Decoder<'a, u32>, + target: &mut Vec, + limit: Option, + ) -> ParquetResult<(usize, Option>)> { + let length = decoder.len(); + + match limit { + None => self + .translate_bitpacked_all(target, decoder) + .map(|_| (length, None)), + Some(limit) if limit >= length => self + .translate_bitpacked_all(target, decoder) + .map(|_| (length, None)), + Some(limit) => self + .translate_bitpacked_limited(target, limit, decoder) + .map(|b| (limit, Some(HybridRleBuffered::Bitpacked(b)))), + } + } +} + +/// This is a unit translation variant of [`Translator`]. This just maps all encoded values from a +/// [`HybridRleDecoder`] to themselves. +/// +/// [`HybridRleDecoder`]: super::HybridRleDecoder +pub struct UnitTranslator; + +impl Translator for UnitTranslator { + fn translate(&self, value: u32) -> ParquetResult { + Ok(value) + } + + fn translate_slice(&self, target: &mut Vec, source: &[u32]) -> ParquetResult<()> { + target.extend_from_slice(source); + Ok(()) + } + fn translate_chunk( + &self, + target: &mut Vec, + source: &::Unpacked, + ) -> ParquetResult<()> { + target.extend_from_slice(&source[..]); + Ok(()) + } + fn translate_bitpacked_all( + &self, + target: &mut Vec, + decoder: Decoder, + ) -> ParquetResult<()> { + decoder.collect_into(target); + Ok(()) + } +} + +/// This is a dictionary translation variant of [`Translator`]. +/// +/// All the [`HybridRleDecoder`] values are regarded as a offset into a dictionary. +/// +/// [`HybridRleDecoder`]: super::HybridRleDecoder +pub struct DictionaryTranslator<'a, T>(pub &'a [T]); + +impl<'a, T: Copy> Translator for DictionaryTranslator<'a, T> { + fn translate(&self, value: u32) -> ParquetResult { + self.0 + .get(value as usize) + .cloned() + .ok_or(ParquetError::oos("Dictionary index is out of range")) + } + + fn translate_slice(&self, target: &mut Vec, source: &[u32]) -> ParquetResult<()> { + let Some(source_max) = source.iter().copied().max() else { + return Ok(()); + }; + + if source_max as usize >= self.0.len() { + return Err(ParquetError::oos("Dictionary index is out of range")); + } + + // Safety: We have checked before that source only has indexes that are smaller than the + // dictionary length. + target.extend( + source + .iter() + .map(|&src_idx| unsafe { *self.0.get_unchecked(src_idx as usize) }), + ); + + Ok(()) + } + + fn translate_chunk( + &self, + target: &mut Vec, + source: &::Unpacked, + ) -> ParquetResult<()> { + let source_max: u32 = source.iter().copied().max().unwrap(); + + if source_max as usize >= self.0.len() { + return Err(ParquetError::oos("Dictionary index is out of range")); + } + + // Safety: We have checked before that source only has indexes that are smaller than the + // dictionary length. + target.extend( + source + .iter() + .map(|&src_idx| unsafe { *self.0.get_unchecked(src_idx as usize) }), + ); + + Ok(()) + } +} + +/// A closure-based translator +pub struct FnTranslator ParquetResult>(pub F); + +impl ParquetResult> Translator for FnTranslator { + fn translate(&self, value: u32) -> ParquetResult { + (self.0)(value) + } +} diff --git a/crates/polars/tests/it/io/parquet/read/binary.rs b/crates/polars/tests/it/io/parquet/read/binary.rs index 63eaf49bd474..a7a7eb4c4e36 100644 --- a/crates/polars/tests/it/io/parquet/read/binary.rs +++ b/crates/polars/tests/it/io/parquet/read/binary.rs @@ -24,13 +24,11 @@ pub fn page_to_vec( .collect(), FixedLenBinaryPageState::RequiredDictionary(dict) => dict .indexes - .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes - .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs index 6951e09367ad..7158864e21bf 100644 --- a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs +++ b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs @@ -21,13 +21,11 @@ pub fn page_to_vec( }, FixedLenBinaryPageState::RequiredDictionary(dict) => dict .indexes - .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { let values = dict .indexes - .iter() .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/primitive.rs b/crates/polars/tests/it/io/parquet/read/primitive.rs index c11df388ee58..825cdca48526 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive.rs @@ -102,14 +102,10 @@ pub fn page_to_vec( NativePageState::Required(values) => Ok(values.map(Some).collect()), NativePageState::RequiredDictionary(dict) => dict .indexes - .iter() .map(|x| dict.dict.value(x as usize).copied().map(Some)) .collect(), NativePageState::OptionalDictionary(validity, dict) => { - let values = dict - .indexes - .iter() - .map(|x| dict.dict.value(x as usize).copied()); + let values = dict.indexes.map(|x| dict.dict.value(x as usize).copied()); deserialize_optional(validity, values) }, }, diff --git a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs index b336cea2f498..e3fead47187c 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs @@ -88,7 +88,7 @@ fn read_array_impl>( let num_bits = get_bit_width(rep_level_encoding.1); let rep_levels = HybridRleDecoder::new(rep_levels, num_bits, length); compose_array( - rep_levels.iter(), + rep_levels, std::iter::repeat(0).take(length), max_rep_level, max_def_level, @@ -100,7 +100,7 @@ fn read_array_impl>( let def_levels = HybridRleDecoder::new(def_levels, num_bits, length); compose_array( std::iter::repeat(0).take(length), - def_levels.iter(), + def_levels, max_rep_level, max_def_level, values, @@ -108,11 +108,9 @@ fn read_array_impl>( }, ((Encoding::Rle, false), (Encoding::Rle, false)) => { let rep_levels = - HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), length) - .iter(); + HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), length); let def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length) - .iter(); + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); compose_array(rep_levels, def_levels, max_rep_level, max_def_level, values) }, _ => todo!(), diff --git a/crates/polars/tests/it/io/parquet/read/struct_.rs b/crates/polars/tests/it/io/parquet/read/struct_.rs index 3d25dbeefe3d..1acf0cf834ac 100644 --- a/crates/polars/tests/it/io/parquet/read/struct_.rs +++ b/crates/polars/tests/it/io/parquet/read/struct_.rs @@ -21,7 +21,7 @@ pub fn extend_validity(val: &mut Vec, page: &DataPage) -> ParquetResult<() ); let mut def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length).iter(); + HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); val.reserve(length); def_levels.try_for_each(|x| { diff --git a/crates/polars/tests/it/io/parquet/read/utils.rs b/crates/polars/tests/it/io/parquet/read/utils.rs index 240303a4024c..64564f473b2f 100644 --- a/crates/polars/tests/it/io/parquet/read/utils.rs +++ b/crates/polars/tests/it/io/parquet/read/utils.rs @@ -1,7 +1,5 @@ use polars_parquet::parquet::deserialize::{HybridDecoderBitmapIter, HybridEncoded, HybridRleIter}; -use polars_parquet::parquet::encoding::hybrid_rle::{ - self, BitmapIter, BufferedHybridRleDecoderIter, HybridRleDecoder, -}; +use polars_parquet::parquet::encoding::hybrid_rle::{self, BitmapIter, HybridRleDecoder}; use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::read::levels::get_bit_width; @@ -41,7 +39,7 @@ pub enum DefLevelsDecoder<'a> { /// that decodes the runs, but not the individual values Bitmap(HybridDecoderBitmapIter<'a>), /// When the maximum definition level is larger than 1 - Levels(BufferedHybridRleDecoderIter<'a>, u32), + Levels(HybridRleDecoder<'a>, u32), } impl<'a> DefLevelsDecoder<'a> { @@ -59,8 +57,7 @@ impl<'a> DefLevelsDecoder<'a> { Self::Bitmap(iter) } else { let iter = - HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()) - .iter(); + HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); Self::Levels(iter, max_def_level as u32) }) } @@ -141,7 +138,7 @@ fn deserialize_bitmap>>( } fn deserialize_levels>>( - levels: BufferedHybridRleDecoderIter, + levels: HybridRleDecoder, max: u32, mut values: I, ) -> Result>, ParquetError> {