Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
compute length to reserve up front
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 21, 2022
1 parent 59f6495 commit a40f689
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 7 deletions.
1 change: 1 addition & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl<O: Offset> Binary<O> {
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
#[inline]
fn reserve(&mut self, additional: usize) {
let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1);
self.values.reserve(additional * avg_len);
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl FixedSizeBinary {
}

impl<'a> Pushable<&'a [u8]> for FixedSizeBinary {
#[inline]
fn reserve(&mut self, additional: usize) {
self.values.reserve(additional * self.size);
}
Expand Down
34 changes: 27 additions & 7 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub(super) trait Pushable<T>: Sized {
}

impl Pushable<bool> for MutableBitmap {
#[inline]
fn reserve(&mut self, additional: usize) {
MutableBitmap::reserve(self, additional)
}
Expand All @@ -63,6 +64,7 @@ impl Pushable<bool> for MutableBitmap {
}

impl<A: Copy + Default> Pushable<A> for Vec<A> {
#[inline]
fn reserve(&mut self, additional: usize) {
Vec::reserve(self, additional)
}
Expand Down Expand Up @@ -296,11 +298,33 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
) {
let limit = limit.unwrap_or(usize::MAX);

let mut runs = vec![];
let mut remaining = limit;
let mut reserve_pushable = 0;

// first do a scan so that we know how much to reserve up front
while remaining > 0 {
let run = page_validity.next_limited(remaining);
let run = if let Some(run) = run { run } else { break };

match run {
FilteredHybridEncoded::Bitmap { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
FilteredHybridEncoded::Repeated { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
_ => {}
};
runs.push(run)
}
pushable.reserve(reserve_pushable);
validity.reserve(reserve_pushable);

// then a second loop to really fill the buffers
for run in runs {
match run {
FilteredHybridEncoded::Bitmap {
values,
Expand All @@ -310,7 +334,6 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
// consume `length` items
let iter = BitmapIter::new(values, offset, length);
let iter = Zip::new(iter, &mut values_iter);
pushable.reserve(length);

for item in iter {
if let Some(item) = item {
Expand All @@ -320,19 +343,16 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
}
}
validity.extend_from_slice(values, offset, length);

remaining -= length;
}
FilteredHybridEncoded::Repeated { is_set, length } => {
validity.extend_constant(length, is_set);
if is_set {
pushable.reserve(length);
(0..length).for_each(|_| pushable.push(values_iter.next().unwrap()));
for v in (&mut values_iter).take(length) {
pushable.push(v)
}
} else {
pushable.extend_constant(length, T::default());
}

remaining -= length;
}
FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {},
};
Expand Down

0 comments on commit a40f689

Please sign in to comment.