diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index 1f50268db23f..93d86da8cb57 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -7,8 +7,6 @@ mod ndjson; #[cfg(feature = "parquet")] mod parquet; -#[cfg(feature = "ipc")] -mod support; use std::mem; #[cfg(feature = "csv")] diff --git a/crates/polars-mem-engine/src/executors/scan/support.rs b/crates/polars-mem-engine/src/executors/scan/support.rs deleted file mode 100644 index 97342e02a91b..000000000000 --- a/crates/polars-mem-engine/src/executors/scan/support.rs +++ /dev/null @@ -1,55 +0,0 @@ -use polars_utils::IdxSize; - -// Tracks the sum of consecutive values in a dynamically sized array where the values can be written -// in any order. -pub struct ConsecutiveCountState { - counts: Box<[IdxSize]>, - next_index: usize, - sum: IdxSize, -} - -impl ConsecutiveCountState { - pub fn new(len: usize) -> Self { - Self { - counts: vec![IdxSize::MAX; len].into_boxed_slice(), - next_index: 0, - sum: 0, - } - } - - /// Sum of all consecutive counts. - pub fn sum(&self) -> IdxSize { - self.sum - } - - /// Write count at index. - pub fn write(&mut self, index: usize, count: IdxSize) { - debug_assert!( - self.counts[index] == IdxSize::MAX, - "second write to same index" - ); - debug_assert!(count != IdxSize::MAX, "count can not be IdxSize::MAX"); - - self.counts[index] = count; - - // Update sum and next index. - while self.next_index < self.counts.len() { - let count = self.counts[self.next_index]; - if count == IdxSize::MAX { - break; - } - self.sum += count; - self.next_index += 1; - } - } - - pub fn len(&self) -> usize { - self.counts.len() - } - - pub fn counts(&self) -> impl Iterator> + '_ { - self.counts - .iter() - .map(|&count| (count != IdxSize::MAX).then_some(count)) - } -} diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs index 4cdd09b6e9e6..abdd372088cc 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs @@ -7,16 +7,16 @@ use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::{ArrowDataType, PhysicalType}; use arrow::offset::Offset; use polars_error::PolarsResult; -use polars_utils::iter::FallibleIterator; use super::super::utils; use super::super::utils::extend_from_decoder; use super::decoders::*; use super::utils::*; +use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{DataPage, DictPage}; -use crate::read::deserialize::utils::{Decoder, StateTranslation}; +use crate::read::deserialize::utils::{Decoder, GatheredHybridRle, StateTranslation}; use crate::read::PrimitiveLogicalType; impl utils::ExactSize for (Binary, MutableBitmap) { @@ -203,27 +203,66 @@ impl utils::Decoder for BinaryDecoder { dict: &Self::Dict, limit: usize, ) -> ParquetResult<()> { + struct BinaryGatherer<'a, O> { + dict: &'a BinaryDict, + _pd: std::marker::PhantomData, + } + + impl<'a, O: Offset> HybridRleGatherer<&'a [u8]> for BinaryGatherer<'a, O> { + type Target = Binary; + + fn target_reserve(&self, target: &mut Self::Target, n: usize) { + // @NOTE: This is an estimation for the reservation. It will probably not be + // accurate, but then it is a lot better than not allocating. + target.offsets.reserve(n); + target.values.reserve(n); + } + + fn target_num_elements(&self, target: &Self::Target) -> usize { + target.offsets.len_proxy() + } + + fn hybridrle_to_target(&self, value: u32) -> ParquetResult<&'a [u8]> { + let value = value as usize; + + if value >= self.dict.len() { + return Err(ParquetError::oos("Binary dictionary index out-of-range")); + } + + Ok(self.dict.value(value)) + } + + fn gather_one(&self, target: &mut Self::Target, value: &'a [u8]) -> ParquetResult<()> { + target.push(value); + Ok(()) + } + + fn gather_repeated( + &self, + target: &mut Self::Target, + value: &'a [u8], + n: usize, + ) -> ParquetResult<()> { + for _ in 0..n { + target.push(value); + } + Ok(()) + } + } + + let gatherer = BinaryGatherer { + dict, + _pd: std::marker::PhantomData, + }; + match page_validity { None => { - // @TODO: Make this into a gatherer - for x in page_values - .by_ref() - .map(|index| dict.value(index as usize)) - .take(limit) - { - values.push(x) - } - page_values.get_result()?; + page_values.gather_n_into(values, limit, &gatherer)?; }, Some(page_validity) => { - extend_from_decoder( - validity, - page_validity, - Some(limit), - values, - &mut page_values.by_ref().map(|index| dict.value(index as usize)), - )?; - page_values.get_result()?; + let collector = GatheredHybridRle::new(page_values, &gatherer, &[]); + + extend_from_decoder(validity, page_validity, Some(limit), values, collector)?; }, } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs index 4f201bc53d5d..0f5fe49be768 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs @@ -35,14 +35,19 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> { match page.encoding() { Encoding::Plain => { + let max_num_values = values.len() * u8::BITS as usize; let num_values = if page_validity.is_some() { // @NOTE: We overestimate the amount of values here, but in the V1 // specification we don't really have a way to know the number of valid items. // Without traversing the list. - values.len() * u8::BITS as usize + max_num_values } else { - page.num_values() + // @NOTE: We cannot really trust the value from this as it might relate to the + // number of top-level nested values. Therefore, we do a `min` with the maximum + // number of possible values. + usize::min(page.num_values(), max_num_values) }; + Ok(Self::Plain(BitmapIter::new(values, 0, num_values))) }, Encoding::Rle => { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index b727dcbc5378..515eb916b3d7 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -1,12 +1,11 @@ -use std::iter::{Peekable, Zip}; - use arrow::array::{Array, DictionaryArray, DictionaryKey}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; -use polars_error::{polars_bail, PolarsResult}; +use polars_error::PolarsResult; use super::utils::{self, BatchableCollector}; use super::{BasicDecompressor, CompressedPagesIter, ParquetError}; +use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; use crate::parquet::encoding::hybrid_rle::HybridRleDecoder; use crate::parquet::error::ParquetResult; use crate::parquet::page::{split_buffer, DataPage, Page}; @@ -240,33 +239,6 @@ pub fn init_nested(init: &[InitNested], capacity: usize) -> NestedState { NestedState::new(container) } -pub struct NestedPage<'a> { - iter: Peekable, HybridRleDecoder<'a>>>, -} - -impl<'a> NestedPage<'a> { - pub fn try_new(page: &'a DataPage) -> PolarsResult { - let split = split_buffer(page)?; - let rep_levels = split.rep; - let def_levels = split.def; - - let max_rep_level = page.descriptor.max_rep_level; - let max_def_level = page.descriptor.max_def_level; - - let reps = - HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values()); - let defs = - HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); - - let reps = reps.into_iter(); - let defs = defs.into_iter(); - - let iter = reps.zip(defs).peekable(); - - Ok(Self { iter }) - } -} - /// The state of nested data types. #[derive(Debug, Default)] pub struct NestedState { @@ -289,11 +261,35 @@ impl NestedState { // outermost is the number of rows self.nested[0].len() } + + /// Returns the definition and repetition levels for each nesting level + fn levels(&self) -> (Vec, Vec) { + let depth = self.nested.len(); + + let mut def_levels = Vec::with_capacity(depth + 1); + let mut rep_levels = Vec::with_capacity(depth + 1); + + def_levels.push(0); + rep_levels.push(0); + + for i in 0..depth { + let nest = &self.nested[i]; + + let def_delta = nest.is_nullable() as u16 + nest.is_repeated() as u16; + let rep_delta = nest.is_repeated() as u16; + + def_levels.push(def_levels[i] + def_delta); + rep_levels.push(rep_levels[i] + rep_delta); + } + + (def_levels, rep_levels) + } } #[allow(clippy::too_many_arguments)] fn extend_offsets2<'a, D: utils::NestedDecoder>( - page: &mut NestedPage<'a>, + def_iter: HybridRleDecoder<'a>, + rep_iter: HybridRleDecoder<'a>, batched_collector: &mut BatchedCollector< '_, (), @@ -303,113 +299,251 @@ fn extend_offsets2<'a, D: utils::NestedDecoder>( nested: &mut [Nested], additional: usize, // Amortized allocations - def_levels: &[u32], - rep_levels: &[u32], + def_levels: &[u16], + rep_levels: &[u16], ) -> PolarsResult { - let max_depth = nested.len(); + struct RowIdxOffsetGatherer; + struct RowIdxOffsetState { + num_elements_seen: usize, + top_level_limit: usize, + found: Option, + } + + impl HybridRleGatherer for RowIdxOffsetGatherer { + type Target = RowIdxOffsetState; - let mut rows = 0; - loop { - // SAFETY: page.iter is always non-empty on first loop. - // The current function gets called multiple times with iterators that - // yield batches of pages. This means e.g. it could be that the very - // first page is a new row, and the existing nested state has already - // contains all data from the additional rows. - if page.iter.peek().unwrap().0 == 0 { - if rows == additional { - return Ok(true); + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} + + fn target_num_elements(&self, target: &Self::Target) -> usize { + target.num_elements_seen + } + + fn hybridrle_to_target(&self, value: u32) -> ParquetResult { + Ok(value == 0) + } + + fn gather_one(&self, target: &mut Self::Target, value: bool) -> ParquetResult<()> { + let idx = target.num_elements_seen; + target.num_elements_seen += 1; + + if !value || target.found.is_some() { + return Ok(()); + } + + if target.top_level_limit > 0 { + target.top_level_limit -= 1; + return Ok(()); + } + + target.found = Some(idx); + + Ok(()) + } + + fn gather_repeated( + &self, + target: &mut Self::Target, + value: bool, + n: usize, + ) -> ParquetResult<()> { + let idx = target.num_elements_seen; + target.num_elements_seen += n; + + if !value || target.found.is_some() { + return Ok(()); + } + + if target.top_level_limit >= n { + target.top_level_limit -= n; + return Ok(()); } - rows += 1; + + target.found = Some(idx + target.top_level_limit); + target.top_level_limit = 0; + + Ok(()) } - // The errors of the FallibleIterators use in this zipped not checked yet. - // If one of them errors, the iterator returns None, and this `unwrap` will panic. - let Some((rep, def)) = page.iter.next() else { - polars_bail!(ComputeError: "cannot read rep/def levels") + // @TODO: Add specialization for other methods + } + + #[derive(Default)] + struct LevelGatherer<'a>(std::marker::PhantomData<&'a ()>); + struct LevelGathererState<'a> { + offset: usize, + slice: &'a mut [u16], + } + + impl<'a> HybridRleGatherer for LevelGatherer<'a> { + type Target = LevelGathererState<'a>; + + fn target_reserve(&self, _target: &mut Self::Target, _n: usize) {} + + fn target_num_elements(&self, target: &Self::Target) -> usize { + target.offset + } + + fn hybridrle_to_target(&self, value: u32) -> ParquetResult { + debug_assert!(value <= u16::MAX as u32); + Ok(value as u16) + } + + fn gather_one(&self, target: &mut Self::Target, value: u16) -> ParquetResult<()> { + debug_assert!(target.offset < target.slice.len()); + + target.slice[target.offset] = value; + target.offset += 1; + + Ok(()) + } + + fn gather_repeated( + &self, + target: &mut Self::Target, + value: u16, + n: usize, + ) -> ParquetResult<()> { + debug_assert!(target.offset + n <= target.slice.len()); + + for i in 0..n { + target.slice[target.offset + i] = value; + } + target.offset += n; + + Ok(()) + } + + // @TODO: Add specialization for other methods + } + + const ROW_IDX_BATCH_SIZE: usize = 1024; + let mut state = RowIdxOffsetState { + num_elements_seen: 0, + top_level_limit: additional, + found: None, + }; + + let mut row_idx_iter = rep_iter.clone(); + while row_idx_iter.len() > 0 && state.found.is_none() { + row_idx_iter.gather_n_into(&mut state, ROW_IDX_BATCH_SIZE, &RowIdxOffsetGatherer)?; + } + + debug_assert_eq!(def_iter.len(), rep_iter.len()); + + let mut def_iter = def_iter; + let mut rep_iter = rep_iter; + let mut def_values = [0u16; DECODE_BATCH_SIZE]; + let mut rep_values = [0u16; DECODE_BATCH_SIZE]; + + let max_depth = nested.len(); + + const DECODE_BATCH_SIZE: usize = 1024; + let mut limit = state.found.unwrap_or(def_iter.len()); + while def_iter.len() > 0 && limit > 0 { + let additional = usize::min(limit, DECODE_BATCH_SIZE); + + let mut def_state = LevelGathererState { + offset: 0, + slice: &mut def_values, + }; + let mut rep_state = LevelGathererState { + offset: 0, + slice: &mut rep_values, }; - let mut is_required = false; + def_iter.gather_n_into(&mut def_state, additional, &LevelGatherer::default())?; + rep_iter.gather_n_into(&mut rep_state, additional, &LevelGatherer::default())?; + + debug_assert_eq!(def_state.offset, rep_state.offset); + debug_assert_eq!(def_state.offset, additional); + + for i in 0..additional { + let def = def_values[i]; + let rep = rep_values[i]; - for depth in 0..max_depth { - // Defines whether this element is defined at `depth` - // - // e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and - // [ [ [ 1 ] ] ]. - let is_defined_at_this_depth = rep <= rep_levels[depth] && def >= def_levels[depth]; + let mut is_required = false; - let length = nested - .get(depth + 1) - .map(|x| x.len() as i64) - // the last depth is the leaf, which is always increased by 1 - .unwrap_or(1); + for depth in 0..max_depth { + // Defines whether this element is defined at `depth` + // + // e.g. [ [ [ 1 ] ] ] is defined at [ ... ], [ [ ... ] ], [ [ [ ... ] ] ] and + // [ [ [ 1 ] ] ]. + let is_defined_at_this_depth = rep <= rep_levels[depth] && def >= def_levels[depth]; - let nest = &mut nested[depth]; + let length = nested + .get(depth + 1) + .map(|x| x.len() as i64) + // the last depth is the leaf, which is always increased by 1 + .unwrap_or(1); - let is_valid = !nest.is_nullable() || def > def_levels[depth]; + let nest = &mut nested[depth]; - if is_defined_at_this_depth && !is_valid { - let mut num_elements = 1; + let is_valid = !nest.is_nullable() || def > def_levels[depth]; - nest.push(length, is_valid); + if is_defined_at_this_depth && !is_valid { + let mut num_elements = 1; - for embed_depth in depth..max_depth { - let embed_length = nested - .get(embed_depth + 1) - .map(|x| x.len() as i64) - // the last depth is the leaf, which is always increased by 1 - .unwrap_or(1); + nest.push(length, is_valid); - let embed_nest = &mut nested[embed_depth]; + for embed_depth in depth..max_depth { + let embed_length = nested + .get(embed_depth + 1) + .map(|x| x.len() as i64) + // the last depth is the leaf, which is always increased by 1 + .unwrap_or(1); - if embed_depth > depth { - for _ in 0..num_elements { - embed_nest.push_default(embed_length); + let embed_nest = &mut nested[embed_depth]; + + if embed_depth > depth { + for _ in 0..num_elements { + embed_nest.push_default(embed_length); + } } - } - if embed_depth == max_depth - 1 { - for _ in 0..num_elements { - batched_collector.push_invalid(); + if embed_depth == max_depth - 1 { + for _ in 0..num_elements { + batched_collector.push_invalid(); + } + + break; } - break; - } + let embed_num_values = embed_nest.invalid_num_values(); - let embed_num_values = embed_nest.invalid_num_values(); + if embed_num_values == 0 { + break; + } - if embed_num_values == 0 { - break; + num_elements *= embed_num_values; } - num_elements *= embed_num_values; + break; } - break; - } + if is_required || is_defined_at_this_depth { + nest.push(length, is_valid); - if is_required || is_defined_at_this_depth { - nest.push(length, is_valid); + if depth == max_depth - 1 { + // the leaf / primitive + let is_valid = (def != def_levels[depth]) || !nest.is_nullable(); - if depth == max_depth - 1 { - // the leaf / primitive - let is_valid = (def != def_levels[depth]) || !nest.is_nullable(); - - if is_valid { - batched_collector.push_valid()?; - } else { - batched_collector.push_invalid(); + if is_valid { + batched_collector.push_valid()?; + } else { + batched_collector.push_invalid(); + } } } - } - is_required = - (is_required || is_defined_at_this_depth) && nest.is_required() && !is_valid; + is_required = + (is_required || is_defined_at_this_depth) && nest.is_required() && !is_valid; + } } - if page.iter.len() == 0 { - return Ok(false); - } + limit -= additional; } + + Ok(def_iter.len() != 0) } pub struct PageNestedDecoder { @@ -433,6 +567,21 @@ pub struct PageNestedDictArrayDecoder< _pd: std::marker::PhantomData, } +/// Return the definition and repetition level iterators for this page. +fn level_iters(page: &DataPage) -> ParquetResult<(HybridRleDecoder, HybridRleDecoder)> { + let split = split_buffer(page)?; + let def = split.def; + let rep = split.rep; + + let max_def_level = page.descriptor.max_def_level; + let max_rep_level = page.descriptor.max_rep_level; + + let def_iter = HybridRleDecoder::new(def, get_bit_width(max_def_level), page.num_values()); + let rep_iter = HybridRleDecoder::new(rep, get_bit_width(max_rep_level), page.num_values()); + + Ok((def_iter, rep_iter)) +} + impl PageNestedDecoder { pub fn new( mut iter: BasicDecompressor, @@ -466,23 +615,7 @@ impl PageNestedDecoder { let mut limit = limit; // Amortize the allocations. - let depth = nested_state.nested.len(); - - let mut def_levels = Vec::with_capacity(depth + 1); - let mut rep_levels = Vec::with_capacity(depth + 1); - - def_levels.push(0); - rep_levels.push(0); - - for i in 0..depth { - let nest = &nested_state.nested[i]; - - let def_delta = nest.is_nullable() as u32 + nest.is_repeated() as u32; - let rep_delta = nest.is_repeated() as u32; - - def_levels.push(def_levels[i] + def_delta); - rep_levels.push(rep_levels[i] + rep_delta); - } + let (def_levels, rep_levels) = nested_state.levels(); loop { let Some(page) = self.iter.next()? else { @@ -494,9 +627,9 @@ impl PageNestedDecoder { unreachable!(); }; - let mut values_page = utils::State::new(&self.decoder, page, self.dict.as_ref())?; - values_page.page_validity = None; - let mut page = NestedPage::try_new(page)?; + let mut values_page = + utils::State::new_nested(&self.decoder, page, self.dict.as_ref())?; + let (def_iter, rep_iter) = level_iters(page)?; let start_length = nested_state.len(); @@ -510,7 +643,8 @@ impl PageNestedDecoder { ); let is_fully_read = extend_offsets2( - &mut page, + def_iter, + rep_iter, &mut batched_collector, &mut nested_state.nested, limit, @@ -521,6 +655,7 @@ impl PageNestedDecoder { batched_collector.finalize()?; let num_done = nested_state.len() - start_length; + debug_assert!(num_done <= limit); limit -= num_done; debug_assert!(values_page.len() == 0 || limit == 0); @@ -596,23 +731,7 @@ impl let mut limit = limit; // Amortize the allocations. - let depth = nested_state.nested.len(); - - let mut def_levels = Vec::with_capacity(depth + 1); - let mut rep_levels = Vec::with_capacity(depth + 1); - - def_levels.push(0); - rep_levels.push(0); - - for i in 0..depth { - let nest = &nested_state.nested[i]; - - let def_delta = nest.is_nullable() as u32 + nest.is_repeated() as u32; - let rep_delta = nest.is_repeated() as u32; - - def_levels.push(def_levels[i] + def_delta); - rep_levels.push(rep_levels[i] + rep_delta); - } + let (def_levels, rep_levels) = nested_state.levels(); loop { let Some(page) = self.iter.next()? else { @@ -626,9 +745,8 @@ impl use utils::ExactSize; let mut dictionary_decoder = DictionaryDecoder::new(self.dict.len()); - let mut values_page = utils::State::new(&dictionary_decoder, page, Some(&()))?; - values_page.page_validity = None; - let mut page = NestedPage::try_new(page)?; + let mut values_page = utils::State::new_nested(&dictionary_decoder, page, Some(&()))?; + let (def_iter, rep_iter) = level_iters(page)?; let start_length = nested_state.len(); @@ -642,7 +760,8 @@ impl ); let is_fully_read = extend_offsets2( - &mut page, + def_iter, + rep_iter, &mut batched_collector, &mut nested_state.nested, limit, @@ -653,6 +772,7 @@ impl batched_collector.finalize()?; let num_done = nested_state.len() - start_length; + debug_assert!(num_done <= limit); limit -= num_done; debug_assert!(values_page.len() == 0 || limit == 0); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index c5f43f4c53fc..910c48e8528d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -180,7 +180,8 @@ where }, (Encoding::Plain, _) => { let values = split_buffer(page)?.values; - Ok(Self::Plain(ArrayChunks::new(values).unwrap())) + let chunks = ArrayChunks::new(values).unwrap(); + Ok(Self::Plain(chunks)) }, (Encoding::ByteStreamSplit, _) => { let values = split_buffer(page)?.values; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index 18f48547ed21..542cc84cd2ef 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -52,7 +52,8 @@ where }, (Encoding::Plain, _) => { let values = split_buffer(page)?.values; - Ok(Self::Plain(ArrayChunks::new(values).unwrap())) + let chunks = ArrayChunks::new(values).unwrap(); + Ok(Self::Plain(chunks)) }, (Encoding::ByteStreamSplit, _) => { let values = split_buffer(page)?.values; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index 7e2af1d4c1b3..883db729404c 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -7,9 +7,11 @@ use arrow::array::{ use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::ArrowDataType; use arrow::pushable::Pushable; +use arrow::types::Offset; use polars_error::{polars_err, PolarsError, PolarsResult}; use self::filter::Filter; +use super::binary::utils::Binary; use super::{BasicDecompressor, CompressedPagesIter, ParquetError}; use crate::parquet::encoding::hybrid_rle::gatherer::{ HybridRleGatherer, ZeroCount, ZeroCountGatherer, @@ -72,6 +74,20 @@ impl<'a, D: Decoder> State<'a, D> { }) } + pub fn new_nested( + decoder: &D, + page: &'a DataPage, + dict: Option<&'a D::Dict>, + ) -> PolarsResult { + let translation = D::Translation::new(decoder, page, dict, None, None)?; + + Ok(Self { + translation, + page_validity: None, + filter: None, + }) + } + pub fn len(&self) -> usize { match &self.page_validity { Some(v) => v.len(), @@ -438,7 +454,7 @@ where impl<'a, 'b, 'c, O, G> BatchableCollector> for GatheredHybridRle<'a, 'b, 'c, O, G> where - O: Clone + Default, + O: Clone, G: HybridRleGatherer>, { #[inline] @@ -460,6 +476,33 @@ where } } +impl<'a, 'b, 'c, O, Out, G> BatchableCollector> + for GatheredHybridRle<'a, 'b, 'c, Out, G> +where + O: Offset, + Out: Clone, + G: HybridRleGatherer>, +{ + #[inline] + fn reserve(target: &mut Binary, n: usize) { + target.offsets.reserve(n); + target.values.reserve(n); + } + + #[inline] + fn push_n(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + self.decoder.gather_n_into(target, n, self.gatherer)?; + Ok(()) + } + + #[inline] + fn push_n_nulls(&mut self, target: &mut Binary, n: usize) -> ParquetResult<()> { + self.gatherer + .gather_repeated(target, self.null_value.clone(), n)?; + Ok(()) + } +} + impl<'a, 'b, 'c, T> BatchableCollector> for TranslatedHybridRle<'a, 'b, 'c, View, T> where @@ -579,8 +622,6 @@ pub(crate) trait NestedDecoder: Decoder { decoded: &mut Self::DecodedState, n: usize, ) -> ParquetResult<()> { - _ = state.page_validity.take(); - state.extend_from_state(self, decoded, n)?; Self::validity_extend(state, decoded, true, n); diff --git a/crates/polars-parquet/src/parquet/deserialize/filtered_rle.rs b/crates/polars-parquet/src/parquet/deserialize/filtered_rle.rs deleted file mode 100644 index 2aca341e67ce..000000000000 --- a/crates/polars-parquet/src/parquet/deserialize/filtered_rle.rs +++ /dev/null @@ -1,284 +0,0 @@ -use std::collections::VecDeque; - -use super::{HybridDecoderBitmapIter, HybridEncoded}; -use crate::parquet::encoding::hybrid_rle::BitmapIter; -use crate::parquet::indexes::Interval; - -/// Type definition of a [`FilteredHybridBitmapIter`] of [`HybridDecoderBitmapIter`]. -pub type FilteredHybridRleDecoderIter<'a> = - FilteredHybridBitmapIter<'a, HybridDecoderBitmapIter<'a>>; - -/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1 -/// that can supports skipped runs -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum FilteredHybridEncoded<'a> { - /// a bitmap (values, offset, length, skipped_set) - Bitmap { - values: &'a [u8], - offset: usize, - length: usize, - }, - Repeated { - is_set: bool, - length: usize, - }, - /// When the run was skipped - contains the number of set values on the skipped run - Skipped(usize), -} - -fn is_set_count(values: &[u8], offset: usize, length: usize) -> usize { - BitmapIter::new(values, offset, length) - .filter(|x| *x) - .count() -} - -impl<'a> FilteredHybridEncoded<'a> { - /// Returns the length of the run in number of items - #[inline] - pub fn len(&self) -> usize { - match self { - FilteredHybridEncoded::Bitmap { length, .. } => *length, - FilteredHybridEncoded::Repeated { length, .. } => *length, - FilteredHybridEncoded::Skipped(_) => 0, - } - } - - #[inline] - pub fn count_ones(self) -> usize { - match self { - FilteredHybridEncoded::Bitmap { - values, - offset, - length, - } => is_set_count(values, offset, length), - FilteredHybridEncoded::Repeated { is_set, length } => { - if is_set { - length - } else { - 0 - } - }, - FilteredHybridEncoded::Skipped(_) => 0, - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -/// An [`Iterator`] adapter over [`HybridEncoded`] that yields [`FilteredHybridEncoded`]. -/// -/// This iterator adapter is used in combination with -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct FilteredHybridBitmapIter<'a, I: Iterator>> { - iter: I, - current: Option<(HybridEncoded<'a>, usize)>, - // a run may end in the middle of an interval, in which case we must - // split the interval in parts. This tracks the current interval being computed - current_interval: Option, - selected_rows: VecDeque, - current_items_in_runs: usize, - - total_items: usize, -} - -impl<'a, I: Iterator>> FilteredHybridBitmapIter<'a, I> { - pub fn new(iter: I, selected_rows: VecDeque) -> Self { - let total_items = selected_rows.iter().map(|x| x.length).sum(); - Self { - iter, - current: None, - current_interval: None, - selected_rows, - current_items_in_runs: 0, - total_items, - } - } - - fn advance_current_interval(&mut self, length: usize) { - if let Some(interval) = &mut self.current_interval { - interval.start += length; - interval.length -= length; - self.total_items -= length; - } - } - - /// Returns the number of elements remaining. Note that each run - /// of the iterator contains more than one element - this is _not_ equivalent to size_hint. - pub fn len(&self) -> usize { - self.total_items - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl<'a, I: Iterator>> Iterator for FilteredHybridBitmapIter<'a, I> { - type Item = FilteredHybridEncoded<'a>; - - fn next(&mut self) -> Option { - let interval = if let Some(interval) = self.current_interval { - interval - } else { - self.current_interval = self.selected_rows.pop_front(); - self.current_interval?; // case where iteration finishes - return self.next(); - }; - - let (run, offset) = if let Some((run, offset)) = self.current { - (run, offset) - } else { - self.current = Some((self.iter.next()?, 0)); - // a new run - return self.next(); - }; - - // one of three things can happen: - // * the start of the interval is not aligned with the start of the run => issue a `Skipped` and advance the run / next run - // * the run contains this interval => consume the interval and keep the run - // * the run contains part of this interval => consume the run and keep the interval - - match run { - HybridEncoded::Repeated(is_set, full_run_length) => { - let run_length = full_run_length - offset; - // interval.start is from the start of the first run; discount `current_items_in_runs` - // to get the start from the current run's offset - let interval_start = interval.start - self.current_items_in_runs; - - if interval_start > 0 { - // we need to skip values from the run - let to_skip = interval_start; - - // we only skip up to a run (yield a single skip per multiple runs) - let max_skip = full_run_length - offset; - let to_skip = to_skip.min(max_skip); - - let set = if is_set { to_skip } else { 0 }; - - self.current_items_in_runs += to_skip; - - self.current = if to_skip == max_skip { - None - } else { - Some((run, offset + to_skip)) - }; - - return Some(FilteredHybridEncoded::Skipped(set)); - }; - - // slice the bitmap according to current interval - // note that interval start is from the start of the first run. - let new_offset = offset + interval_start; - - if interval_start > run_length { - let set = if is_set { run_length } else { 0 }; - - self.advance_current_interval(run_length); - self.current_items_in_runs += run_length; - self.current = None; - Some(FilteredHybridEncoded::Skipped(set)) - } else { - let length = if run_length > interval.length { - // interval is fully consumed - self.current_items_in_runs += interval.length; - - // fetch next interval - self.total_items -= interval.length; - self.current_interval = self.selected_rows.pop_front(); - - self.current = Some((run, offset + interval.length)); - - interval.length - } else { - // the run is consumed and the interval is shortened accordingly - self.current_items_in_runs += run_length; - - // the interval may cover two runs; shorten the length - // to its maximum allowed for this run - let length = run_length.min(full_run_length - new_offset); - - self.advance_current_interval(length); - - self.current = None; - length - }; - Some(FilteredHybridEncoded::Repeated { is_set, length }) - } - }, - HybridEncoded::Bitmap(values, full_run_length) => { - let run_length = full_run_length - offset; - // interval.start is from the start of the first run; discount `current_items_in_runs` - // to get the start from the current run's offset - let interval_start = interval.start - self.current_items_in_runs; - - if interval_start > 0 { - // we need to skip values from the run - let to_skip = interval_start; - - // we only skip up to a run (yield a single skip per multiple runs) - let max_skip = full_run_length - offset; - let to_skip = to_skip.min(max_skip); - - let set = is_set_count(values, offset, to_skip); - - self.current_items_in_runs += to_skip; - - self.current = if to_skip == max_skip { - None - } else { - Some((run, offset + to_skip)) - }; - - return Some(FilteredHybridEncoded::Skipped(set)); - }; - - // slice the bitmap according to current interval - // note that interval start is from the start of the first run. - let new_offset = offset + interval_start; - - if interval_start > run_length { - let set = is_set_count(values, offset, full_run_length); - - self.advance_current_interval(run_length); - self.current_items_in_runs += run_length; - self.current = None; - Some(FilteredHybridEncoded::Skipped(set)) - } else { - let length = if run_length > interval.length { - // interval is fully consumed - self.current_items_in_runs += interval.length; - - // fetch next interval - self.total_items -= interval.length; - self.current_interval = self.selected_rows.pop_front(); - - self.current = Some((run, offset + interval.length)); - - interval.length - } else { - // the run is consumed and the interval is shortened accordingly - self.current_items_in_runs += run_length; - - // the interval may cover two runs; shorten the length - // to its maximum allowed for this run - let length = run_length.min(full_run_length - new_offset); - - self.advance_current_interval(length); - - self.current = None; - length - }; - Some(FilteredHybridEncoded::Bitmap { - values, - offset: new_offset, - length, - }) - } - }, - } - } -} diff --git a/crates/polars-parquet/src/parquet/deserialize/hybrid_rle.rs b/crates/polars-parquet/src/parquet/deserialize/hybrid_rle.rs deleted file mode 100644 index ecc1b6144caa..000000000000 --- a/crates/polars-parquet/src/parquet/deserialize/hybrid_rle.rs +++ /dev/null @@ -1,208 +0,0 @@ -use crate::parquet::encoding::hybrid_rle::{self, BitmapIter}; - -/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1 -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum HybridEncoded<'a> { - /// a bitmap - Bitmap(&'a [u8], usize), - /// A repeated item. The first attribute corresponds to whether the value is set - /// the second attribute corresponds to the number of repetitions. - Repeated(bool, usize), -} - -impl<'a> HybridEncoded<'a> { - /// Returns the length of the run in number of items - #[inline] - pub fn len(&self) -> usize { - match self { - HybridEncoded::Bitmap(_, length) => *length, - HybridEncoded::Repeated(_, length) => *length, - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -pub trait HybridRleRunsIterator<'a>: Iterator> { - /// Number of elements remaining. This may not be the items of the iterator - an item - /// of the iterator may contain more than one element. - fn number_of_elements(&self) -> usize; -} - -/// An iterator of [`HybridEncoded`], adapter over [`hybrid_rle::HybridEncoded`]. -#[derive(Debug, Clone)] -pub struct HybridRleIter<'a, I> -where - I: Iterator>, -{ - iter: I, - length: usize, - consumed: usize, -} - -impl<'a, I> HybridRleIter<'a, I> -where - I: Iterator>, -{ - /// Returns a new [`HybridRleIter`] - #[inline] - pub fn new(iter: I, length: usize) -> Self { - Self { - iter, - length, - consumed: 0, - } - } - - /// the number of elements in the iterator. Note that this _is not_ the number of runs. - #[inline] - pub fn len(&self) -> usize { - self.length - self.consumed - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl<'a, I> HybridRleRunsIterator<'a> for HybridRleIter<'a, I> -where - I: Iterator>, -{ - fn number_of_elements(&self) -> usize { - self.len() - } -} - -impl<'a, I> Iterator for HybridRleIter<'a, I> -where - I: Iterator>, -{ - type Item = HybridEncoded<'a>; - - #[inline] - fn next(&mut self) -> Option { - if self.consumed == self.length { - return None; - }; - let run = self.iter.next(); - - run.map(|run| match run { - hybrid_rle::HybridEncoded::Bitpacked(pack) => { - // a pack has at most `pack.len() * 8` bits - let pack_size = pack.len() * 8; - - let additional = pack_size.min(self.len()); - - self.consumed += additional; - HybridEncoded::Bitmap(pack, additional) - }, - hybrid_rle::HybridEncoded::Rle(value, length) => { - let is_set = value[0] == 1; - - let additional = length.min(self.len()); - - self.consumed += additional; - HybridEncoded::Repeated(is_set, additional) - }, - }) - } - - fn size_hint(&self) -> (usize, Option) { - self.iter.size_hint() - } -} - -/// Type definition for a [`HybridRleIter`] using [`hybrid_rle::Decoder`]. -pub type HybridDecoderBitmapIter<'a> = HybridRleIter<'a, hybrid_rle::Decoder<'a>>; - -#[derive(Debug)] -enum HybridBooleanState<'a> { - /// a bitmap - Bitmap(BitmapIter<'a>), - /// A repeated item. The first attribute corresponds to whether the value is set - /// the second attribute corresponds to the number of repetitions. - Repeated(bool, usize), -} - -/// An iterator adapter that maps an iterator of [`HybridEncoded`] into an iterator -/// over [`bool`]. -#[derive(Debug)] -pub struct HybridRleBooleanIter<'a, I> -where - I: Iterator>, -{ - iter: I, - current_run: Option>, -} - -impl<'a, I> HybridRleBooleanIter<'a, I> -where - I: HybridRleRunsIterator<'a>, -{ - pub fn new(iter: I) -> Self { - Self { - iter, - current_run: None, - } - } - - fn set_new_run(&mut self, run: HybridEncoded<'a>) -> Option { - let run = match run { - HybridEncoded::Bitmap(bitmap, length) => { - HybridBooleanState::Bitmap(BitmapIter::new(bitmap, 0, length)) - }, - HybridEncoded::Repeated(value, length) => HybridBooleanState::Repeated(value, length), - }; - self.current_run = Some(run); - self.next() - } -} - -impl<'a, I> Iterator for HybridRleBooleanIter<'a, I> -where - I: HybridRleRunsIterator<'a>, -{ - type Item = bool; - - #[inline] - fn next(&mut self) -> Option { - if let Some(run) = &mut self.current_run { - match run { - HybridBooleanState::Bitmap(bitmap) => match bitmap.next() { - Some(val) => Some(val), - None => { - let run = self.iter.next()?; - self.set_new_run(run) - }, - }, - HybridBooleanState::Repeated(value, remaining) => { - if *remaining == 0 { - let run = self.iter.next()?; - self.set_new_run(run) - } else { - *remaining -= 1; - Some(*value) - } - }, - } - } else if let Some(run) = self.iter.next() { - self.set_new_run(run) - } else { - None - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - let exact = self.iter.number_of_elements(); - (exact, Some(exact)) - } -} - -/// Type definition for a [`HybridRleBooleanIter`] using [`hybrid_rle::Decoder`]. -pub type HybridRleDecoderIter<'a> = HybridRleBooleanIter<'a, HybridDecoderBitmapIter<'a>>; diff --git a/crates/polars-parquet/src/parquet/deserialize/mod.rs b/crates/polars-parquet/src/parquet/deserialize/mod.rs deleted file mode 100644 index 480f3534f93a..000000000000 --- a/crates/polars-parquet/src/parquet/deserialize/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -mod filtered_rle; -mod hybrid_rle; -mod utils; - -pub use filtered_rle::*; -pub use hybrid_rle::*; -pub use utils::SliceFilteredIter; diff --git a/crates/polars-parquet/src/parquet/deserialize/utils.rs b/crates/polars-parquet/src/parquet/deserialize/utils.rs deleted file mode 100644 index 43a5a235cb71..000000000000 --- a/crates/polars-parquet/src/parquet/deserialize/utils.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::collections::VecDeque; - -use crate::parquet::indexes::Interval; - -/// An iterator adapter that converts an iterator over items into an iterator over slices of -/// those N items. -/// -/// This iterator is best used with iterators that implement `nth` since skipping items -/// allows this iterator to skip sequences of items without having to call each of them. -#[derive(Debug, Clone)] -pub struct SliceFilteredIter { - pub(crate) iter: I, - selected_rows: VecDeque, - current_remaining: usize, - current: usize, // position in the slice - total_length: usize, -} - -impl SliceFilteredIter { - /// Return a new [`SliceFilteredIter`] - pub fn new(iter: I, selected_rows: VecDeque) -> Self { - let total_length = selected_rows.iter().map(|i| i.length).sum(); - Self { - iter, - selected_rows, - current_remaining: 0, - current: 0, - total_length, - } - } -} - -impl> Iterator for SliceFilteredIter { - type Item = T; - - #[inline] - fn next(&mut self) -> Option { - if self.current_remaining == 0 { - if let Some(interval) = self.selected_rows.pop_front() { - // skip the hole between the previous start and this start - // (start + length) - start - let item = self.iter.nth(interval.start - self.current); - self.current = interval.start + interval.length; - self.current_remaining = interval.length - 1; - self.total_length -= 1; - item - } else { - None - } - } else { - self.current_remaining -= 1; - self.total_length -= 1; - self.iter.next() - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - (self.total_length, Some(self.total_length)) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn basic() { - let iter = 0..=100; - - let intervals = vec![ - Interval::new(0, 2), - Interval::new(20, 11), - Interval::new(31, 1), - ]; - - let a: VecDeque = intervals.clone().into_iter().collect(); - let mut a = SliceFilteredIter::new(iter, a); - - let expected: Vec = intervals - .into_iter() - .flat_map(|interval| interval.start..(interval.start + interval.length)) - .collect(); - - assert_eq!(expected, a.by_ref().collect::>()); - assert_eq!((0, Some(0)), a.size_hint()); - } -} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs deleted file mode 100644 index f7adaa0ffb33..000000000000 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs +++ /dev/null @@ -1,134 +0,0 @@ -use polars_utils::slice::GetSaferUnchecked; - -use super::super::{ceil8, uleb128}; -use super::HybridEncoded; - -/// An [`Iterator`] of [`HybridEncoded`]. -#[derive(Debug, Clone)] -pub struct Decoder<'a> { - values: &'a [u8], - num_bits: usize, -} - -impl<'a> Decoder<'a> { - /// Returns a new [`Decoder`] - pub fn new(values: &'a [u8], num_bits: usize) -> Self { - Self { values, num_bits } - } - - /// Returns the number of bits being used by this decoder. - #[inline] - pub fn num_bits(&self) -> usize { - self.num_bits - } -} - -impl<'a> Iterator for Decoder<'a> { - type Item = HybridEncoded<'a>; - - #[inline] // -18% improvement in bench - fn next(&mut self) -> Option { - let (indicator, consumed) = uleb128::decode(self.values); - self.values = unsafe { self.values.get_unchecked_release(consumed..) }; - - // We want to early return if consumed == 0 OR num_bits == 0, so combine into a single branch. - if (consumed * self.num_bits) == 0 { - return None; - } - - if indicator & 1 == 1 { - // is bitpacking - let bytes = (indicator as usize >> 1) * self.num_bits; - let bytes = std::cmp::min(bytes, self.values.len()); - let (result, remaining) = self.values.split_at(bytes); - self.values = remaining; - Some(HybridEncoded::Bitpacked(result)) - } else { - // is rle - let run_length = indicator as usize >> 1; - // repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width) - let rle_bytes = ceil8(self.num_bits); - let (result, remaining) = self.values.split_at(rle_bytes); - self.values = remaining; - Some(HybridEncoded::Rle(result, run_length)) - } - } -} - -#[cfg(test)] -mod tests { - use super::super::super::bitpacked; - use super::*; - - #[test] - fn basics_1() { - let bit_width = 1usize; - let length = 5; - let values = [ - 2, 0, 0, 0, // length - 0b00000011, 0b00001011, // data - ]; - - let mut decoder = Decoder::new(&values[4..6], bit_width); - - let run = decoder.next().unwrap(); - - if let HybridEncoded::Bitpacked(values) = run { - assert_eq!(values, &[0b00001011]); - let result = bitpacked::Decoder::::try_new(values, bit_width, length) - .unwrap() - .collect(); - assert_eq!(result, &[1, 1, 0, 1, 0]); - } else { - panic!() - }; - } - - #[test] - fn basics_2() { - // This test was validated by the result of what pyarrow3 outputs when - // the bitmap is used. - let bit_width = 1; - let values = [ - 3, 0, 0, 0, // length - 0b00000101, 0b11101011, 0b00000010, // data - ]; - let expected = &[1, 1, 0, 1, 0, 1, 1, 1, 0, 1]; - - let mut decoder = Decoder::new(&values[4..4 + 3], bit_width); - - let run = decoder.next().unwrap(); - - if let HybridEncoded::Bitpacked(values) = run { - assert_eq!(values, &[0b11101011, 0b00000010]); - let result = bitpacked::Decoder::::try_new(values, bit_width, 10) - .unwrap() - .collect(); - assert_eq!(result, expected); - } else { - panic!() - }; - } - - #[test] - fn basics_3() { - let bit_width = 1; - let length = 8; - let values = [ - 2, 0, 0, 0, // length - 0b00010000, // data - 0b00000001, - ]; - - let mut decoder = Decoder::new(&values[4..4 + 2], bit_width); - - let run = decoder.next().unwrap(); - - if let HybridEncoded::Rle(values, items) = run { - assert_eq!(values, &[0b00000001]); - assert_eq!(items, length); - } else { - panic!() - }; - } -} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs index 1e2c42d985f5..23983104d1f2 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs @@ -329,31 +329,3 @@ fn small_fuzz() -> ParquetResult<()> { fn large_fuzz() -> ParquetResult<()> { fuzz_loops(1_000_000) } - -#[test] -fn found_cases() -> ParquetResult<()> { - let mut encoded = Vec::with_capacity(1024); - let mut decoded = Vec::with_capacity(1024); - - let num_bits = 7; - - let bs: [u32; 1024] = std::array::from_fn(|i| (i / 10) as u32); - - encoder::encode(&mut encoded, bs.iter().copied(), num_bits).unwrap(); - let mut decoder = HybridRleDecoder::new(&encoded[..], num_bits, bs.len()); - - while decoder.len() != 0 { - let n = decoder.next().unwrap(); - decoded.push(n); - } - - for _ in 0..1 { - _ = decoder.next(); - } - - decoder.get_result()?; - - assert_eq!(&decoded, &bs); - - Ok(()) -} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 138b05815dd2..e5e935ab525f 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -1,7 +1,6 @@ // See https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3 mod bitmap; mod buffered; -mod decoder; mod encoder; pub mod gatherer; @@ -10,12 +9,10 @@ mod fuzz; pub use bitmap::{encode_bool as bitpacked_encode, BitmapIter}; pub use buffered::BufferedBitpacked; -pub use decoder::Decoder; pub use encoder::encode; pub use gatherer::{ DictionaryTranslator, FnTranslator, Translator, TryFromUsizeTranslator, UnitTranslator, }; -use polars_utils::iter::FallibleIterator; use polars_utils::slice::GetSaferUnchecked; use self::buffered::HybridRleBuffered; @@ -52,63 +49,8 @@ pub struct HybridRleDecoder<'a> { num_values: usize, buffered: Option>, - /// The result after iterating. - /// - /// This is only needed because we iterate over individual elements. - result: Option, } -impl<'a> FallibleIterator for HybridRleDecoder<'a> { - fn get_result(&mut self) -> Result<(), ParquetError> { - match self.result.take() { - None => Ok(()), - Some(err) => Err(err), - } - } -} - -impl<'a> Iterator for HybridRleDecoder<'a> { - type Item = u32; - - fn next(&mut self) -> Option { - if self.num_values == 0 { - return None; - } - - if self.num_bits == 0 { - self.num_values -= 1; - return Some(0); - } - - if let Some(buffered) = self.buffered.as_mut() { - match buffered.next() { - None => self.buffered = None, - Some(value) => { - self.num_values -= 1; - return Some(value); - }, - } - } - - let mut buffer = Vec::with_capacity(1); - let result = self.gather_limited_once(&mut buffer, Some(1), &UnitTranslator); - - match result { - Ok(_) => Some(buffer[0]), - Err(err) => { - self.result = Some(err); - None - }, - } - } - - fn size_hint(&self) -> (usize, Option) { - (self.num_values, Some(self.num_values)) - } -} - -impl<'a> ExactSizeIterator for HybridRleDecoder<'a> {} - impl<'a> HybridRleDecoder<'a> { /// Returns a new [`HybridRleDecoder`] pub fn new(data: &'a [u8], num_bits: u32, num_values: usize) -> Self { @@ -118,10 +60,13 @@ impl<'a> HybridRleDecoder<'a> { num_values, buffered: None, - result: None, } } + pub fn len(&self) -> usize { + self.num_values + } + fn gather_limited_once>( &mut self, target: &mut G::Target, @@ -268,6 +213,8 @@ impl<'a> HybridRleDecoder<'a> { } if self.num_bits == 0 { + let n = usize::min(n, self.num_values); + let value = gatherer.hybridrle_to_target(0)?; gatherer.gather_repeated(target, value, n)?; self.num_values -= n; @@ -561,11 +508,7 @@ mod tests { let decoder = HybridRleDecoder::new(&data, num_bits, 1000); let result = decoder.collect()?; - let mut decoder = HybridRleDecoder::new(&data, num_bits, 1000); - let iterator_result: Vec<_> = Iterator::collect(&mut decoder); - assert_eq!(result, (0..1000).collect::>()); - assert_eq!(iterator_result, (0..1000).collect::>()); Ok(()) } diff --git a/crates/polars-parquet/src/parquet/mod.rs b/crates/polars-parquet/src/parquet/mod.rs index 2a746400843b..f40b21ea0e04 100644 --- a/crates/polars-parquet/src/parquet/mod.rs +++ b/crates/polars-parquet/src/parquet/mod.rs @@ -3,7 +3,6 @@ pub mod error; #[cfg(feature = "bloom_filter")] pub mod bloom_filter; pub mod compression; -pub mod deserialize; pub mod encoding; pub mod indexes; pub mod metadata; diff --git a/crates/polars/tests/it/io/parquet/read/binary.rs b/crates/polars/tests/it/io/parquet/read/binary.rs index 724e7d791c42..4fcf27e173b7 100644 --- a/crates/polars/tests/it/io/parquet/read/binary.rs +++ b/crates/polars/tests/it/io/parquet/read/binary.rs @@ -4,6 +4,7 @@ use polars_parquet::parquet::page::DataPage; use super::dictionary::BinaryPageDict; use super::utils::deserialize_optional; +use crate::io::parquet::read::hybrid_rle_iter; use crate::io::parquet::read::utils::FixedLenBinaryPageState; pub fn page_to_vec( @@ -29,8 +30,7 @@ pub fn page_to_vec( dict.indexes.translate_and_collect(&dictionary) }, FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { - let values = dict - .indexes + let values = hybrid_rle_iter(dict.indexes)? .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/deserialize.rs b/crates/polars/tests/it/io/parquet/read/deserialize.rs index 90e16a24683a..1cd906b2c0e8 100644 --- a/crates/polars/tests/it/io/parquet/read/deserialize.rs +++ b/crates/polars/tests/it/io/parquet/read/deserialize.rs @@ -1,6 +1,3 @@ -use polars_parquet::parquet::deserialize::{ - FilteredHybridBitmapIter, FilteredHybridEncoded, HybridEncoded, -}; use polars_parquet::parquet::indexes::Interval; #[test] diff --git a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs index 7158864e21bf..5f484fbe05a0 100644 --- a/crates/polars/tests/it/io/parquet/read/fixed_binary.rs +++ b/crates/polars/tests/it/io/parquet/read/fixed_binary.rs @@ -3,6 +3,7 @@ use polars_parquet::parquet::page::DataPage; use super::dictionary::FixedLenByteArrayPageDict; use super::utils::{deserialize_optional, FixedLenBinaryPageState}; +use crate::io::parquet::read::hybrid_rle_iter; pub fn page_to_vec( page: &DataPage, @@ -19,13 +20,11 @@ pub fn page_to_vec( FixedLenBinaryPageState::Required(values) => { Ok(values.map(|x| x.to_vec()).map(Some).collect()) }, - FixedLenBinaryPageState::RequiredDictionary(dict) => dict - .indexes + FixedLenBinaryPageState::RequiredDictionary(dict) => hybrid_rle_iter(dict.indexes)? .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec()).map(Some)) .collect(), FixedLenBinaryPageState::OptionalDictionary(validity, dict) => { - let values = dict - .indexes + let values = hybrid_rle_iter(dict.indexes)? .map(|x| dict.dict.value(x as usize).map(|x| x.to_vec())); deserialize_optional(validity, values) }, diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index 175ed27c723b..95ab13936d19 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -3,7 +3,6 @@ mod binary; /// In comparison to Arrow, this in-memory format does not leverage logical types nor SIMD operations, /// but OTOH it has no external dependencies and is very familiar to Rust developers. mod boolean; -mod deserialize; mod dictionary; mod fixed_binary; mod indexes; @@ -17,6 +16,7 @@ use std::fs::File; use dictionary::{deserialize as deserialize_dict, DecodedDictPage}; #[cfg(feature = "async")] use futures::StreamExt; +use polars_parquet::parquet::encoding::hybrid_rle::HybridRleDecoder; use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::metadata::ColumnChunkMetaData; use polars_parquet::parquet::page::{CompressedPage, DataPage, Page}; @@ -36,6 +36,10 @@ use polars_utils::mmap::MemReader; use super::*; +pub fn hybrid_rle_iter(d: HybridRleDecoder) -> ParquetResult> { + Ok(d.collect()?.into_iter()) +} + pub fn get_path() -> PathBuf { let dir = env!("CARGO_MANIFEST_DIR"); PathBuf::from(dir).join("../../docs/data") diff --git a/crates/polars/tests/it/io/parquet/read/primitive.rs b/crates/polars/tests/it/io/parquet/read/primitive.rs index f9c47ace5679..d9665f353c53 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive.rs @@ -1,28 +1,12 @@ -use polars_parquet::parquet::deserialize::{ - HybridRleDecoderIter, HybridRleIter, SliceFilteredIter, -}; -use polars_parquet::parquet::encoding::hybrid_rle::{Decoder, FnTranslator}; -use polars_parquet::parquet::encoding::Encoding; +use polars_parquet::parquet::encoding::hybrid_rle::FnTranslator; use polars_parquet::parquet::error::ParquetResult; -use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; -use polars_parquet::parquet::schema::Repetition; +use polars_parquet::parquet::page::DataPage; use polars_parquet::parquet::types::NativeType; use polars_parquet::read::ParquetError; use super::dictionary::PrimitivePageDict; -use super::utils::{deserialize_optional, native_cast, Casted, NativePageState, OptionalValues}; - -/// The deserialization state of a `DataPage` of `Primitive` parquet primitive type -#[derive(Debug)] -pub enum FilteredPageState<'a, T> -where - T: NativeType, -{ - /// A page of optional values - Optional(SliceFilteredIter, Casted<'a, T>>>), - /// A page of required values - Required(SliceFilteredIter>), -} +use super::hybrid_rle_iter; +use super::utils::{deserialize_optional, NativePageState}; /// The deserialization state of a `DataPage` of `Primitive` parquet primitive type #[derive(Debug)] @@ -32,7 +16,6 @@ where T: NativeType, { Nominal(NativePageState<'a, T, &'a PrimitivePageDict>), - Filtered(FilteredPageState<'a, T>), } impl<'a, T: NativeType> PageState<'a, T> { @@ -43,48 +26,8 @@ impl<'a, T: NativeType> PageState<'a, T> { page: &'a DataPage, dict: Option<&'a PrimitivePageDict>, ) -> Result { - if let Some(selected_rows) = page.selected_rows() { - let is_optional = - page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - - match (page.encoding(), dict, is_optional) { - (Encoding::Plain, _, true) => { - let EncodedSplitBuffer { - rep: _, - def: def_levels, - values: _, - } = split_buffer(page)?; - - let validity = HybridRleDecoderIter::new(HybridRleIter::new( - Decoder::new(def_levels, 1), - page.num_values(), - )); - let values = native_cast(page)?; - - // validity and values interleaved. - let values = OptionalValues::new(validity, values); - - let values = - SliceFilteredIter::new(values, selected_rows.iter().copied().collect()); - - Ok(Self::Filtered(FilteredPageState::Optional(values))) - }, - (Encoding::Plain, _, false) => { - let values = SliceFilteredIter::new( - native_cast(page)?, - selected_rows.iter().copied().collect(), - ); - Ok(Self::Filtered(FilteredPageState::Required(values))) - }, - _ => Err(ParquetError::FeatureNotSupported(format!( - "Viewing page for encoding {:?} for native type {}", - page.encoding(), - std::any::type_name::() - ))), - } - } else { - NativePageState::try_new(page, dict).map(Self::Nominal) - } + assert!(page.selected_rows().is_none()); + NativePageState::try_new(page, dict).map(Self::Nominal) } } @@ -106,13 +49,10 @@ pub fn page_to_vec( dict.indexes.translate_and_collect(&dictionary) }, NativePageState::OptionalDictionary(validity, dict) => { - let values = dict.indexes.map(|x| dict.dict.value(x as usize).copied()); + let values = + hybrid_rle_iter(dict.indexes)?.map(|x| dict.dict.value(x as usize).copied()); deserialize_optional(validity, values) }, }, - PageState::Filtered(state) => match state { - FilteredPageState::Optional(values) => Ok(values.collect()), - FilteredPageState::Required(values) => Ok(values.map(Some).collect()), - }, } } diff --git a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs index e3fead47187c..e4abd2046432 100644 --- a/crates/polars/tests/it/io/parquet/read/primitive_nested.rs +++ b/crates/polars/tests/it/io/parquet/read/primitive_nested.rs @@ -6,7 +6,7 @@ use polars_parquet::parquet::read::levels::get_bit_width; use polars_parquet::parquet::types::NativeType; use super::dictionary::PrimitivePageDict; -use super::Array; +use super::{hybrid_rle_iter, Array}; fn read_buffer(values: &[u8]) -> impl Iterator + '_ { let chunks = values.chunks_exact(std::mem::size_of::()); @@ -88,7 +88,7 @@ fn read_array_impl>( let num_bits = get_bit_width(rep_level_encoding.1); let rep_levels = HybridRleDecoder::new(rep_levels, num_bits, length); compose_array( - rep_levels, + hybrid_rle_iter(rep_levels)?, std::iter::repeat(0).take(length), max_rep_level, max_def_level, @@ -100,7 +100,7 @@ fn read_array_impl>( let def_levels = HybridRleDecoder::new(def_levels, num_bits, length); compose_array( std::iter::repeat(0).take(length), - def_levels, + hybrid_rle_iter(def_levels)?, max_rep_level, max_def_level, values, @@ -111,7 +111,13 @@ fn read_array_impl>( HybridRleDecoder::new(rep_levels, get_bit_width(rep_level_encoding.1), length); let def_levels = HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); - compose_array(rep_levels, def_levels, max_rep_level, max_def_level, values) + compose_array( + hybrid_rle_iter(rep_levels)?, + hybrid_rle_iter(def_levels)?, + max_rep_level, + max_def_level, + values, + ) }, _ => todo!(), } diff --git a/crates/polars/tests/it/io/parquet/read/struct_.rs b/crates/polars/tests/it/io/parquet/read/struct_.rs index 1acf0cf834ac..b74765ce721b 100644 --- a/crates/polars/tests/it/io/parquet/read/struct_.rs +++ b/crates/polars/tests/it/io/parquet/read/struct_.rs @@ -3,6 +3,8 @@ use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; use polars_parquet::parquet::read::levels::get_bit_width; +use super::hybrid_rle_iter; + pub fn extend_validity(val: &mut Vec, page: &DataPage) -> ParquetResult<()> { let EncodedSplitBuffer { rep: _, @@ -20,12 +22,11 @@ pub fn extend_validity(val: &mut Vec, page: &DataPage) -> ParquetResult<() page.descriptor.max_def_level, ); - let mut def_levels = - HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); + let def_levels = HybridRleDecoder::new(def_levels, get_bit_width(def_level_encoding.1), length); val.reserve(length); - def_levels.try_for_each(|x| { + hybrid_rle_iter(def_levels)?.for_each(|x| { val.push(x != 0); - Ok(()) - }) + }); + Ok(()) } diff --git a/crates/polars/tests/it/io/parquet/read/utils.rs b/crates/polars/tests/it/io/parquet/read/utils.rs index 64564f473b2f..19feaee29534 100644 --- a/crates/polars/tests/it/io/parquet/read/utils.rs +++ b/crates/polars/tests/it/io/parquet/read/utils.rs @@ -1,4 +1,3 @@ -use polars_parquet::parquet::deserialize::{HybridDecoderBitmapIter, HybridEncoded, HybridRleIter}; use polars_parquet::parquet::encoding::hybrid_rle::{self, BitmapIter, HybridRleDecoder}; use polars_parquet::parquet::error::{ParquetError, ParquetResult}; use polars_parquet::parquet::page::{split_buffer, DataPage, EncodedSplitBuffer}; @@ -34,10 +33,6 @@ pub(super) fn dict_indices_decoder(page: &DataPage) -> ParquetResult { - /// When the maximum definition level is 1, the definition levels are RLE-encoded and - /// the bitpacked runs are bitmaps. This variant contains [`HybridDecoderBitmapIter`] - /// that decodes the runs, but not the individual values - Bitmap(HybridDecoderBitmapIter<'a>), /// When the maximum definition level is larger than 1 Levels(HybridRleDecoder<'a>, u32), } @@ -51,11 +46,7 @@ impl<'a> DefLevelsDecoder<'a> { } = split_buffer(page)?; let max_def_level = page.descriptor.max_def_level; - Ok(if max_def_level == 1 { - let iter = hybrid_rle::Decoder::new(def_levels, 1); - let iter = HybridRleIter::new(iter, page.num_values()); - Self::Bitmap(iter) - } else { + Ok({ let iter = HybridRleDecoder::new(def_levels, get_bit_width(max_def_level), page.num_values()); Self::Levels(iter, max_def_level as u32) @@ -63,86 +54,24 @@ impl<'a> DefLevelsDecoder<'a> { } } -/// Iterator adapter to convert an iterator of non-null values and an iterator over validity -/// into an iterator of optional values. -#[derive(Debug, Clone)] -pub struct OptionalValues, I: Iterator> { - validity: V, - values: I, -} - -impl, I: Iterator> OptionalValues { - pub fn new(validity: V, values: I) -> Self { - Self { validity, values } - } -} - -impl, I: Iterator> Iterator for OptionalValues { - type Item = Option; - - #[inline] - fn next(&mut self) -> Option { - self.validity - .next() - .map(|x| if x { self.values.next() } else { None }) - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.validity.size_hint() - } -} - pub fn deserialize_optional>>( validity: DefLevelsDecoder, values: I, ) -> ParquetResult>> { match validity { - DefLevelsDecoder::Bitmap(bitmap) => deserialize_bitmap(bitmap, values), DefLevelsDecoder::Levels(levels, max_level) => { deserialize_levels(levels, max_level, values) }, } } -fn deserialize_bitmap>>( - mut validity: HybridDecoderBitmapIter, - mut values: I, -) -> Result>, ParquetError> { - let mut deserialized = Vec::with_capacity(validity.len()); - - validity.try_for_each(|run| match run { - HybridEncoded::Bitmap(bitmap, length) => { - BitmapIter::new(bitmap, 0, length).try_for_each(|x| { - if x { - deserialized.push(values.next().transpose()?); - } else { - deserialized.push(None); - } - Result::<_, ParquetError>::Ok(()) - }) - }, - HybridEncoded::Repeated(is_set, length) => { - if is_set { - deserialized.reserve(length); - for x in values.by_ref().take(length) { - deserialized.push(Some(x?)) - } - } else { - deserialized.extend(std::iter::repeat(None).take(length)) - } - Ok(()) - }, - })?; - Ok(deserialized) -} - fn deserialize_levels>>( levels: HybridRleDecoder, max: u32, mut values: I, ) -> Result>, ParquetError> { levels + .collect()? .into_iter() .map(|x| { if x == max { diff --git a/crates/polars/tests/it/io/parquet/write/indexes.rs b/crates/polars/tests/it/io/parquet/write/indexes.rs index 4ddff73b5fb1..3f5f15c92828 100644 --- a/crates/polars/tests/it/io/parquet/write/indexes.rs +++ b/crates/polars/tests/it/io/parquet/write/indexes.rs @@ -3,21 +3,16 @@ use std::io::Cursor; use polars_parquet::parquet::compression::CompressionOptions; use polars_parquet::parquet::error::ParquetResult; use polars_parquet::parquet::indexes::{ - select_pages, BoundaryOrder, Index, Interval, NativeIndex, PageIndex, PageLocation, + BoundaryOrder, Index, NativeIndex, PageIndex, PageLocation, }; use polars_parquet::parquet::metadata::SchemaDescriptor; -use polars_parquet::parquet::read::{ - read_columns_indexes, read_metadata, read_pages_locations, BasicDecompressor, IndexedPageReader, -}; +use polars_parquet::parquet::read::{read_columns_indexes, read_metadata, read_pages_locations}; use polars_parquet::parquet::schema::types::{ParquetType, PhysicalType, PrimitiveType}; use polars_parquet::parquet::write::{ Compressor, DynIter, DynStreamingIterator, FileWriter, Version, WriteOptions, }; -use polars_utils::mmap::MemReader; -use super::super::read::collect; use super::primitive::array_to_page_v1; -use super::Array; fn write_file() -> ParquetResult> { let page1 = vec![Some(0), Some(1), None, Some(3), Some(4), Some(5), Some(6)]; @@ -57,35 +52,6 @@ fn write_file() -> ParquetResult> { Ok(writer.into_inner().into_inner()) } -#[test] -fn read_indexed_page() -> ParquetResult<()> { - let data = write_file()?; - let mut reader = MemReader::from_vec(data); - - let metadata = read_metadata(&mut reader)?; - - let column = 0; - let columns = &metadata.row_groups[0].columns(); - - // selected the rows - let intervals = &[Interval::new(2, 2)]; - - let pages = read_pages_locations(&mut reader, columns)?; - - let pages = select_pages(intervals, &pages[column], metadata.row_groups[0].num_rows())?; - - let pages = IndexedPageReader::new(reader, &columns[column], pages, vec![], vec![]); - - let pages = BasicDecompressor::new(pages, vec![]); - - let arrays = collect(pages, columns[column].physical_type())?; - - // the second item and length 2 - assert_eq!(arrays, vec![Array::Int32(vec![None, Some(3)])]); - - Ok(()) -} - #[test] fn read_indexes_and_locations() -> ParquetResult<()> { let data = write_file()?;