Skip to content

Commit

Permalink
perf: Batch parquet primitive decoding (#17462)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 6, 2024
1 parent 89489b7 commit 909f08f
Show file tree
Hide file tree
Showing 26 changed files with 1,726 additions and 265 deletions.
167 changes: 152 additions & 15 deletions crates/polars-arrow/src/bitmap/utils/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,42 +46,123 @@ impl<'a> BitmapIter<'a> {
rest_len,
}
}

/// Consume and returns the numbers of `1` / `true` values at the beginning of the iterator.
///
/// This performs the same operation as `(&mut iter).take_while(|b| b).count()`.
///
/// This is a lot more efficient than consecutively polling the iterator and should therefore
/// be preferred, if the use-case allows for it.
pub fn take_leading_ones(&mut self) -> usize {
let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize);
self.word_len -= word_ones;
self.word = self.word.wrapping_shr(word_ones as u32);

if self.word_len != 0 {
return word_ones;
}

let mut num_leading_ones = word_ones;

while self.rest_len != 0 {
self.word_len = usize::min(self.rest_len, 64);
self.rest_len -= self.word_len;

unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let word_ones = usize::min(self.word_len, self.word.trailing_ones() as usize);
self.word_len -= word_ones;
self.word = self.word.wrapping_shr(word_ones as u32);
num_leading_ones += word_ones;

if self.word_len != 0 {
return num_leading_ones;
}
}

num_leading_ones
}

/// Consume and returns the numbers of `0` / `false` values that the start of the iterator.
///
/// This performs the same operation as `(&mut iter).take_while(|b| !b).count()`.
///
/// This is a lot more efficient than consecutively polling the iterator and should therefore
/// be preferred, if the use-case allows for it.
pub fn take_leading_zeros(&mut self) -> usize {
let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize);
self.word_len -= word_zeros;
self.word = self.word.wrapping_shr(word_zeros as u32);

if self.word_len != 0 {
return word_zeros;
}

let mut num_leading_zeros = word_zeros;

while self.rest_len != 0 {
self.word_len = usize::min(self.rest_len, 64);
self.rest_len -= self.word_len;
unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let word_zeros = usize::min(self.word_len, self.word.trailing_zeros() as usize);
self.word_len -= word_zeros;
self.word = self.word.wrapping_shr(word_zeros as u32);
num_leading_zeros += word_zeros;

if self.word_len != 0 {
return num_leading_zeros;
}
}

num_leading_zeros
}

/// Returns the number of remaining elements in the iterator
#[inline]
pub fn num_remaining(&self) -> usize {
self.word_len + self.rest_len
}
}

impl<'a> Iterator for BitmapIter<'a> {
type Item = bool;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.word_len != 0 {
let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
return Some(ret);
}
if self.word_len == 0 {
if self.rest_len == 0 {
return None;
}

if self.rest_len != 0 {
self.word_len = self.rest_len.min(64);
self.rest_len -= self.word_len;

unsafe {
let chunk = self.bytes.get_unchecked(..8).try_into().unwrap();
self.word = u64::from_le_bytes(chunk);
self.bytes = self.bytes.get_unchecked(8..);
}

let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
return Some(ret);
}

None
let ret = self.word & 1 != 0;
self.word >>= 1;
self.word_len -= 1;
Some(ret)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
let exact = self.word_len + self.rest_len;
(exact, Some(exact))
let num_remaining = self.num_remaining();
(num_remaining, Some(num_remaining))
}
}

Expand All @@ -102,3 +183,59 @@ impl<'a> DoubleEndedIterator for BitmapIter<'a> {

unsafe impl TrustedLen for BitmapIter<'_> {}
impl ExactSizeIterator for BitmapIter<'_> {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
#[ignore = "Fuzz test. Too slow"]
fn test_leading_ops() {
for _ in 0..10_000 {
let bs = rand::random::<u8>() % 4;

let mut length = 0;
let mut pattern = Vec::new();
for _ in 0..rand::random::<usize>() % 1024 {
let word = match bs {
0 => u64::MIN,
1 => u64::MAX,
2 | 3 => rand::random(),
_ => unreachable!(),
};

pattern.extend_from_slice(&word.to_le_bytes());
length += 64;
}

for _ in 0..rand::random::<usize>() % 7 {
pattern.push(rand::random::<u8>());
length += 8;
}

let last_length = rand::random::<usize>() % 8;
if last_length != 0 {
pattern.push(rand::random::<u8>());
length += last_length;
}

let mut iter = BitmapIter::new(&pattern, 0, length);

let mut prev_remaining = iter.num_remaining();
while iter.num_remaining() != 0 {
let num_ones = iter.clone().take_leading_ones();
assert_eq!(num_ones, (&mut iter).take_while(|&b| b).count());

let num_zeros = iter.clone().take_leading_zeros();
assert_eq!(num_zeros, (&mut iter).take_while(|&b| !b).count());

// Ensure that we are making progress
assert!(iter.num_remaining() < prev_remaining);
prev_remaining = iter.num_remaining();
}

assert_eq!(iter.take_leading_zeros(), 0);
assert_eq!(iter.take_leading_ones(), 0);
}
}
}
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/legacy/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub trait CustomIterTools: Iterator {
where
Self: Sized,
{
TrustMyLength::new(self, length)
unsafe { TrustMyLength::new(self, length) }
}

fn collect_trusted<T: FromTrustedLenIterator<Self::Item>>(self) -> T
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-arrow/src/pushable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub trait Pushable<T>: Sized + Default {
fn push(&mut self, value: T);
fn len(&self) -> usize;
fn push_null(&mut self);
#[inline]
fn extend_n(&mut self, n: usize, iter: impl Iterator<Item = T>) {
for item in iter.take(n) {
self.push(item);
}
}
fn extend_constant(&mut self, additional: usize, value: T);
fn extend_null_constant(&mut self, additional: usize);
fn freeze(self) -> Self::Freeze;
Expand All @@ -31,6 +37,7 @@ impl Pushable<bool> for MutableBitmap {
fn reserve(&mut self, additional: usize) {
MutableBitmap::reserve(self, additional)
}

#[inline]
fn len(&self) -> usize {
self.len()
Expand Down Expand Up @@ -82,6 +89,11 @@ impl<T: Copy + Default> Pushable<T> for Vec<T> {
self.push(value)
}

#[inline]
fn extend_n(&mut self, n: usize, iter: impl Iterator<Item = T>) {
self.extend(iter.take(n));
}

#[inline]
fn extend_constant(&mut self, additional: usize, value: T) {
self.resize(self.len() + additional, value);
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-arrow/src/trusted_len.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ impl<I, J> TrustMyLength<I, J>
where
I: Iterator<Item = J>,
{
/// Create a new `TrustMyLength` iterator
///
/// # Safety
///
/// This is safe if the iterator always has the exact length given by `len`.
#[inline]
pub fn new(iter: I, len: usize) -> Self {
pub unsafe fn new(iter: I, len: usize) -> Self {
Self { iter, len }
}
}
Expand All @@ -104,6 +109,7 @@ where
self.iter.next()
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
Expand Down
14 changes: 7 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values,
),
)?,
BinaryState::Required(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
Expand All @@ -92,7 +92,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
offsets,
page_values.lengths.by_ref(),
);
)?;

let length = *offsets.last() - last_offset;

Expand Down Expand Up @@ -123,7 +123,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
);
)?;
page_values.values.get_result()?;
},
BinaryState::RequiredDictionary(page) => {
Expand All @@ -148,7 +148,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values.by_ref(),
);
)?;
},
BinaryState::FilteredOptionalDelta(page_validity, page_values) => {
extend_from_decoder(
Expand All @@ -157,7 +157,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values.by_ref(),
);
)?;
},
BinaryState::FilteredRequiredDictionary(page) => {
// Already done on the dict.
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
.values
.by_ref()
.map(|index| page_dict.value(index as usize)),
);
)?;
page_values.values.get_result()?;
},
BinaryState::OptionalDeltaByteArray(page_validity, page_values) => extend_from_decoder(
Expand All @@ -195,7 +195,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
Some(additional),
values,
page_values,
),
)?,
BinaryState::DeltaByteArray(page_values) => {
for x in page_values.take(additional) {
values.push(x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ impl<'a> FilteredDelta<'a> {

#[derive(Debug)]
pub(crate) struct RequiredDictionary<'a> {
pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryDict,
}

impl<'a> RequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
}
Expand All @@ -198,13 +198,13 @@ impl<'a> RequiredDictionary<'a> {

#[derive(Debug)]
pub(crate) struct FilteredRequiredDictionary<'a> {
pub values: SliceFilteredIter<hybrid_rle::BufferedHybridRleDecoderIter<'a>>,
pub values: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a BinaryDict,
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);
Expand All @@ -220,13 +220,13 @@ impl<'a> FilteredRequiredDictionary<'a> {

#[derive(Debug)]
pub(crate) struct ValuesDictionary<'a> {
pub values: hybrid_rle::BufferedHybridRleDecoderIter<'a>,
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryDict,
}

impl<'a> ValuesDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryDict) -> PolarsResult<Self> {
let values = utils::dict_indices_decoder(page)?.into_iter();
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
}
Expand Down
Loading

0 comments on commit 909f08f

Please sign in to comment.