Skip to content

Commit

Permalink
refactor: Remove HybridRLE iter / batch nested parquet decoding (#17889)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 26, 2024
1 parent d6e5e87 commit cb108bd
Show file tree
Hide file tree
Showing 25 changed files with 423 additions and 1,238 deletions.
2 changes: 0 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;

#[cfg(feature = "ipc")]
mod support;
use std::mem;

#[cfg(feature = "csv")]
Expand Down
55 changes: 0 additions & 55 deletions crates/polars-mem-engine/src/executors/scan/support.rs

This file was deleted.

77 changes: 58 additions & 19 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::{ArrowDataType, PhysicalType};
use arrow::offset::Offset;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::utils;
use super::super::utils::extend_from_decoder;
use super::decoders::*;
use super::utils::*;
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::utils::{Decoder, StateTranslation};
use crate::read::deserialize::utils::{Decoder, GatheredHybridRle, StateTranslation};
use crate::read::PrimitiveLogicalType;

impl<O: Offset> utils::ExactSize for (Binary<O>, MutableBitmap) {
Expand Down Expand Up @@ -203,27 +203,66 @@ impl<O: Offset> utils::Decoder for BinaryDecoder<O> {
dict: &Self::Dict,
limit: usize,
) -> ParquetResult<()> {
struct BinaryGatherer<'a, O> {
dict: &'a BinaryDict,
_pd: std::marker::PhantomData<O>,
}

impl<'a, O: Offset> HybridRleGatherer<&'a [u8]> for BinaryGatherer<'a, O> {
type Target = Binary<O>;

fn target_reserve(&self, target: &mut Self::Target, n: usize) {
// @NOTE: This is an estimation for the reservation. It will probably not be
// accurate, but then it is a lot better than not allocating.
target.offsets.reserve(n);
target.values.reserve(n);
}

fn target_num_elements(&self, target: &Self::Target) -> usize {
target.offsets.len_proxy()
}

fn hybridrle_to_target(&self, value: u32) -> ParquetResult<&'a [u8]> {
let value = value as usize;

if value >= self.dict.len() {
return Err(ParquetError::oos("Binary dictionary index out-of-range"));
}

Ok(self.dict.value(value))
}

fn gather_one(&self, target: &mut Self::Target, value: &'a [u8]) -> ParquetResult<()> {
target.push(value);
Ok(())
}

fn gather_repeated(
&self,
target: &mut Self::Target,
value: &'a [u8],
n: usize,
) -> ParquetResult<()> {
for _ in 0..n {
target.push(value);
}
Ok(())
}
}

let gatherer = BinaryGatherer {
dict,
_pd: std::marker::PhantomData,
};

match page_validity {
None => {
// @TODO: Make this into a gatherer
for x in page_values
.by_ref()
.map(|index| dict.value(index as usize))
.take(limit)
{
values.push(x)
}
page_values.get_result()?;
page_values.gather_n_into(values, limit, &gatherer)?;
},
Some(page_validity) => {
extend_from_decoder(
validity,
page_validity,
Some(limit),
values,
&mut page_values.by_ref().map(|index| dict.value(index as usize)),
)?;
page_values.get_result()?;
let collector = GatheredHybridRle::new(page_values, &gatherer, &[]);

extend_from_decoder(validity, page_validity, Some(limit), values, collector)?;
},
}

Expand Down
9 changes: 7 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,19 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {

match page.encoding() {
Encoding::Plain => {
let max_num_values = values.len() * u8::BITS as usize;
let num_values = if page_validity.is_some() {
// @NOTE: We overestimate the amount of values here, but in the V1
// specification we don't really have a way to know the number of valid items.
// Without traversing the list.
values.len() * u8::BITS as usize
max_num_values
} else {
page.num_values()
// @NOTE: We cannot really trust the value from this as it might relate to the
// number of top-level nested values. Therefore, we do a `min` with the maximum
// number of possible values.
usize::min(page.num_values(), max_num_values)
};

Ok(Self::Plain(BitmapIter::new(values, 0, num_values)))
},
Encoding::Rle => {
Expand Down
Loading

0 comments on commit cb108bd

Please sign in to comment.