diff --git a/crates/polars-arrow/src/bitmap/bitmap_ops.rs b/crates/polars-arrow/src/bitmap/bitmap_ops.rs index 9e5ac502e6b5..a3edb658be4e 100644 --- a/crates/polars-arrow/src/bitmap/bitmap_ops.rs +++ b/crates/polars-arrow/src/bitmap/bitmap_ops.rs @@ -300,6 +300,22 @@ pub fn intersects_with_mut(lhs: &MutableBitmap, rhs: &MutableBitmap) -> bool { ) } +pub fn num_edges(lhs: &Bitmap) -> usize { + if lhs.is_empty() { + return 0; + } + + // @TODO: If is probably quite inefficient to do it like this because now either one is not + // aligned. Maybe, we can implement a smarter way to do this. + binary_fold( + &unsafe { lhs.clone().sliced_unchecked(0, lhs.len() - 1) }, + &unsafe { lhs.clone().sliced_unchecked(1, lhs.len() - 1) }, + |l, r| (l ^ r).count_ones() as usize, + 0, + |acc, v| acc + v, + ) +} + /// Compute `out[i] = if selector[i] { truthy[i] } else { falsy }`. pub fn select_constant(selector: &Bitmap, truthy: &Bitmap, falsy: bool) -> Bitmap { let falsy_mask: u64 = if falsy { diff --git a/crates/polars-arrow/src/bitmap/immutable.rs b/crates/polars-arrow/src/bitmap/immutable.rs index 4b52045afa9f..6ad76a07b639 100644 --- a/crates/polars-arrow/src/bitmap/immutable.rs +++ b/crates/polars-arrow/src/bitmap/immutable.rs @@ -555,6 +555,11 @@ impl Bitmap { pub fn select_constant(&self, truthy: &Self, falsy: bool) -> Self { super::bitmap_ops::select_constant(self, truthy, falsy) } + + /// Calculates the number of edges from `0 -> 1` and `1 -> 0`. + pub fn num_edges(&self) -> usize { + super::bitmap_ops::num_edges(self) + } } impl> From

for Bitmap { diff --git a/crates/polars-arrow/src/datatypes/mod.rs b/crates/polars-arrow/src/datatypes/mod.rs index c232c985d8a0..832886509eb4 100644 --- a/crates/polars-arrow/src/datatypes/mod.rs +++ b/crates/polars-arrow/src/datatypes/mod.rs @@ -545,6 +545,22 @@ impl ArrowDataType { } } + pub fn is_nested(&self) -> bool { + use ArrowDataType as D; + + matches!( + self, + D::List(_) + | D::LargeList(_) + | D::FixedSizeList(_, _) + | D::Struct(_) + | D::Union(_, _, _) + | D::Map(_, _) + | D::Dictionary(_, _, _) + | D::Extension(_, _, _) + ) + } + pub fn is_view(&self) -> bool { matches!(self, ArrowDataType::Utf8View | ArrowDataType::BinaryView) } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 1f8dfbd65295..ce514a9e8b0e 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -2,6 +2,7 @@ use std::borrow::Cow; use std::collections::VecDeque; use std::ops::{Deref, Range}; +use arrow::array::BooleanArray; use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::ArrowSchemaRef; use polars_core::prelude::*; @@ -313,6 +314,20 @@ fn rg_to_dfs_prefiltered( debug_assert_eq!(live_idx_to_col_idx.len(), num_live_columns); debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns); + enum MaskSetting { + Auto, + Pre, + Post, + } + + let mask_setting = + std::env::var("POLARS_PQ_PREFILTERED_MASK").map_or(MaskSetting::Auto, |v| match &v[..] { + "auto" => MaskSetting::Auto, + "pre" => MaskSetting::Pre, + "post" => MaskSetting::Post, + _ => panic!("Invalid `POLARS_PQ_PREFILTERED_MASK` value '{v}'."), + }); + POOL.install(|| { // Set partitioned fields to prevent quadratic behavior. // Ensure all row groups are partitioned. @@ -394,38 +409,34 @@ fn rg_to_dfs_prefiltered( return Ok(dfs.into_iter().map(|(_, df)| df).collect()); } - // @TODO: Incorporate this if we how we can properly use it. The problem here is that - // different columns really have a different cost when it comes to collecting them. We - // would need a cost model to properly estimate this. - // - // // For bitmasks that are seemingly random (i.e. not clustered or biased towards 0 or 1), - // // filtering with a bitmask in the Parquet reader is actually around 1.5 - 2.2 times slower - // // than collecting everything and filtering afterwards. This is because stopping and - // // starting decoding is not free. - // // - // // To combat this we try to detect here how biased our data is. We do this with a bithack - // // that estimates the amount of switches from 0 to 1 and from 1 to 0. This can be SIMD-ed - // // very well and gives us quite good estimate of how random our bitmask is. Then, we select - // // the filter if the bitmask is not that random. - // let do_filter_rg = dfs - // .par_iter() - // .map(|(mask, _)| { - // let iter = mask.fast_iter_u64(); - // - // // The iter is TrustedLen so the size_hint is exact. - // let num_items = iter.size_hint().0; - // let num_switches = iter - // .map(|v| (v ^ v.rotate_right(1)).count_ones() as u64) - // .sum::(); - // - // // We ignore the iter remainder since we only really care about the average. - // let avg_num_switches_per_element = num_switches / num_items as u64; - // - // // We select the filter if the average amount of switches per 64 elements is less - // // than or equal to 2. - // avg_num_switches_per_element <= 2 - // }) - // .collect::>(); + let rg_prefilter_costs = matches!(mask_setting, MaskSetting::Auto) + .then(|| { + dfs.par_iter() + .map(|(mask, _)| { + let num_edges = mask.num_edges() as f64; + let rg_len = mask.len() as f64; + + // @GB: I did quite some analysis on this. + // + // Pre-filtered and Post-filtered can both be faster in certain scenarios. + // + // - Pre-filtered is faster when there is some amount of clustering or + // sorting involved or if the number of values selected is small. + // - Post-filtering is faster when the predicate selects a somewhat random + // elements throughout the row group. + // + // The following is a heuristic value to try and estimate which one is + // faster. Essentially, it sees how many times it needs to switch between + // skipping items and collecting items and compares it against the number + // of values that it will collect. + // + // Closer to 0: post-filtering is probably better. + // Closer to 1: pre-filtering is probably better. + (num_edges / rg_len).clamp(0.0, 1.0) + }) + .collect::>() + }) + .unwrap_or_default(); let mut rg_columns = (0..dfs.len() * num_dead_columns) .into_par_iter() @@ -444,20 +455,58 @@ fn rg_to_dfs_prefiltered( } let field_md = part_mds[rg_idx as usize].get_partitions(name).unwrap(); - column_idx_to_series( - col_idx, - field_md.as_slice(), - Some(Filter::new_masked(mask.clone())), - schema, - store, - ) + let pre = || { + column_idx_to_series( + col_idx, + field_md.as_slice(), + Some(Filter::new_masked(mask.clone())), + schema, + store, + ) + }; + let post = || { + let array = + column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)?; + + debug_assert_eq!(array.len(), mask.len()); + + let mask_arr = BooleanArray::new(ArrowDataType::Boolean, mask.clone(), None); + let mask_arr = BooleanChunked::from(mask_arr); + array.filter(&mask_arr) + }; + + let array = match mask_setting { + MaskSetting::Auto => { + // Prefiltering is more expensive for nested types so we make the cut-off + // higher. + let is_nested = schema.fields[col_idx].data_type.is_nested(); + let prefilter_cost = rg_prefilter_costs[i / num_dead_columns]; + + // We empirically selected these numbers. + let do_prefilter = (is_nested && prefilter_cost <= 0.01) + || (!is_nested && prefilter_cost <= 0.02); + + if do_prefilter { + pre()? + } else { + post()? + } + }, + MaskSetting::Pre => pre()?, + MaskSetting::Post => post()?, + }; + + debug_assert_eq!(array.len(), mask.set_bits()); + + Ok(array) }) .collect::>>()?; let Some(df) = dfs.first().map(|(_, df)| df) else { return Ok(Vec::new()); }; - let rearranged_schema = df.schema(); + let mut rearranged_schema = df.schema(); + rearranged_schema.merge(Schema::from(schema)); rg_columns .par_chunks_exact_mut(num_dead_columns) @@ -465,6 +514,8 @@ fn rg_to_dfs_prefiltered( .map(|(rg_cols, (_, mut df))| { let rg_cols = rg_cols.iter_mut().map(std::mem::take).collect::>(); + debug_assert!(rg_cols.iter().all(|v| v.len() == df.height())); + // We first add the columns with the live columns at the start. Then, we do a // projections that puts the columns at the right spot. df._add_columns(rg_cols, &rearranged_schema)?; diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs index af7868517673..a1f3054bd852 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs @@ -41,11 +41,8 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { Ok(Self::Plain(values)) }, - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(_)) => { let values = dict_indices_decoder(page)?; - if is_string { - arrow::array::validate_utf8_view(dict.0.as_ref(), dict.1.as_ref())?; - } Ok(Self::Dictionary(values)) }, (Encoding::DeltaLengthByteArray, _) => { @@ -93,6 +90,7 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { &mut self, decoder: &mut BinViewDecoder, decoded: &mut ::DecodedState, + is_optional: bool, page_validity: &mut Option>, dict: Option<&'a ::Dict>, additional: usize, @@ -107,6 +105,7 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { decoder.decode_plain_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), additional, )?; @@ -120,6 +119,7 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { decoder.decode_dictionary_encoded( decoded, page, + is_optional, page_validity.as_mut(), dict, additional, @@ -138,7 +138,13 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { }; match page_validity { - None => (&mut collector).push_n(values, additional)?, + None => { + (&mut collector).push_n(values, additional)?; + + if is_optional { + validity.extend_constant(additional, true); + } + }, Some(page_validity) => extend_from_decoder( validity, page_validity, @@ -158,7 +164,13 @@ impl<'a> utils::StateTranslation<'a, BinViewDecoder> for StateTranslation<'a> { }; match page_validity { - None => collector.push_n(values, additional)?, + None => { + collector.push_n(values, additional)?; + + if is_optional { + validity.extend_constant(additional, true); + } + }, Some(page_validity) => extend_from_decoder( validity, page_validity, @@ -536,7 +548,7 @@ impl utils::Decoder for BinViewDecoder { Ok(()) } - fn deserialize_dict(&self, page: DictPage) -> Self::Dict { + fn deserialize_dict(&self, page: DictPage) -> ParquetResult { let values = &page.buffer; let num_values = page.num_values; @@ -549,9 +561,12 @@ impl utils::Decoder for BinViewDecoder { let mut buffers = Vec::with_capacity(1); let mut offset = 0; - for v in BinaryIter::new(values, num_values) { - if v.len() <= View::MAX_INLINE_SIZE as usize { - views.push(View::new_inline(v)); + let mut max_length = 0; + views.extend(BinaryIter::new(values, num_values).map(|v| { + let length = v.len(); + max_length = usize::max(length, max_length); + if length <= View::MAX_INLINE_SIZE as usize { + View::new_inline(v) } else { if offset >= u32::MAX as usize { let full_buffer = std::mem::take(&mut buffer); @@ -562,20 +577,39 @@ impl utils::Decoder for BinViewDecoder { } buffer.extend_from_slice(v); - views.push(View::new_from_bytes(v, buffers.len() as u32, offset as u32)); + let view = View::new_from_bytes(v, buffers.len() as u32, offset as u32); offset += v.len(); + view } - } + })); buffers.push(Buffer::from(buffer)); - (views, buffers) + if self.check_utf8.load(Ordering::Relaxed) { + // This is a small trick that allows us to check the Parquet buffer instead of the view + // buffer. Batching the UTF-8 verification is more performant. For this to be allowed, + // all the interleaved lengths need to be valid UTF-8. + // + // Every strings prepended by 4 bytes (L, 0, 0, 0), since we check here L < 128. L is + // only a valid first byte of a UTF-8 code-point and (L, 0, 0, 0) is valid UTF-8. + // Consequently, it is valid to just check the whole buffer. + if max_length < 128 { + simdutf8::basic::from_utf8(values) + .map_err(|_| ParquetError::oos("String data contained invalid UTF-8"))?; + } else { + arrow::array::validate_utf8_view(&views, &buffers) + .map_err(|_| ParquetError::oos("String data contained invalid UTF-8"))?; + } + } + + Ok((views, buffers)) } fn decode_plain_encoded<'a>( &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()> { @@ -629,7 +663,13 @@ impl utils::Decoder for BinViewDecoder { }; match page_validity { - None => collector.push_n(values, limit)?, + None => { + collector.push_n(values, limit)?; + + if is_optional { + validity.extend_constant(limit, true); + } + }, Some(page_validity) => { extend_from_decoder(validity, page_validity, Some(limit), values, collector)? }, @@ -641,6 +681,10 @@ impl utils::Decoder for BinViewDecoder { // This is a small trick that allows us to check the Parquet buffer instead of the view // buffer. Batching the UTF-8 verification is more performant. For this to be allowed, // all the interleaved lengths need to be valid UTF-8. + // + // Every strings prepended by 4 bytes (L, 0, 0, 0), since we check here L < 128. L is + // only a valid first byte of a UTF-8 code-point and (L, 0, 0, 0) is valid UTF-8. + // Consequently, it is valid to just check the whole buffer. if max_length < 128 { simdutf8::basic::from_utf8(buffer) .map_err(|_| ParquetError::oos("String data contained invalid UTF-8"))?; @@ -658,6 +702,7 @@ impl utils::Decoder for BinViewDecoder { &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut hybrid_rle::HybridRleDecoder<'a>, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, dict: &Self::Dict, limit: usize, @@ -747,6 +792,10 @@ impl utils::Decoder for BinViewDecoder { match page_validity { None => { page_values.gather_n_into(values, limit, &translator)?; + + if is_optional { + validity.extend_constant(limit, true); + } }, Some(page_validity) => { struct Collector<'a, 'b> { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs index 79808d2b388d..e99e7a5ed56c 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs @@ -89,6 +89,7 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> { &mut self, decoder: &mut BooleanDecoder, decoded: &mut ::DecodedState, + is_optional: bool, page_validity: &mut Option>, _: Option<&'a ::Dict>, additional: usize, @@ -97,13 +98,20 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> { Self::Plain(page_values) => decoder.decode_plain_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), additional, )?, Self::Rle(page_values) => { let (values, validity) = decoded; match page_validity { - None => page_values.gather_n_into(values, additional, &BitmapGatherer)?, + None => { + page_values.gather_n_into(values, additional, &BitmapGatherer)?; + + if is_optional { + validity.extend_constant(additional, true); + } + }, Some(page_validity) => utils::extend_from_decoder( validity, page_validity, @@ -199,17 +207,26 @@ impl Decoder for BooleanDecoder { ) } - fn deserialize_dict(&self, _: DictPage) -> Self::Dict {} + fn deserialize_dict(&self, _: DictPage) -> ParquetResult { + Ok(()) + } fn decode_plain_encoded<'a>( &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()> { match page_validity { - None => page_values.collect_n_into(values, limit), + None => { + page_values.collect_n_into(values, limit); + + if is_optional { + validity.extend_constant(limit, true); + } + }, Some(page_validity) => { extend_from_decoder(validity, page_validity, Some(limit), values, page_values)? }, @@ -222,6 +239,7 @@ impl Decoder for BooleanDecoder { &mut self, _decoded: &mut Self::DecodedState, _page_values: &mut HybridRleDecoder<'a>, + _is_optional: bool, _page_validity: Option<&mut PageValidity<'a>>, _dict: &Self::Dict, _limit: usize, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs index 09f9807ae5ac..db718ed9c330 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs @@ -47,6 +47,7 @@ impl<'a, K: DictionaryKey, D: utils::DictDecodable> StateTranslation<'a, Diction &mut self, decoder: &mut DictionaryDecoder, decoded: &mut as Decoder>::DecodedState, + is_optional: bool, page_validity: &mut Option>, _: Option<&'a as Decoder>::Dict>, additional: usize, @@ -65,7 +66,13 @@ impl<'a, K: DictionaryKey, D: utils::DictDecodable> StateTranslation<'a, Diction }; match page_validity { - None => collector.push_n(&mut decoded.0, additional)?, + None => { + collector.push_n(&mut decoded.0, additional)?; + + if is_optional { + validity.extend_constant(additional, true); + } + }, Some(page_validity) => { extend_from_decoder(validity, page_validity, Some(additional), values, collector)? }, @@ -105,11 +112,11 @@ impl utils::Decoder for DictionaryDec ) } - fn deserialize_dict(&self, page: DictPage) -> Self::Dict { - let dict = self.decoder.deserialize_dict(page); + fn deserialize_dict(&self, page: DictPage) -> ParquetResult { + let dict = self.decoder.deserialize_dict(page)?; self.dict_size .store(dict.len(), std::sync::atomic::Ordering::Relaxed); - dict + Ok(dict) } fn finalize( @@ -129,6 +136,7 @@ impl utils::Decoder for DictionaryDec &mut self, _decoded: &mut Self::DecodedState, _page_values: &mut as StateTranslation<'a, Self>>::PlainDecoder, + _is_optional: bool, _page_validity: Option<&mut PageValidity<'a>>, _limit: usize, ) -> ParquetResult<()> { @@ -139,6 +147,7 @@ impl utils::Decoder for DictionaryDec &mut self, _decoded: &mut Self::DecodedState, _page_values: &mut HybridRleDecoder<'a>, + _is_optional: bool, _page_validity: Option<&mut PageValidity<'a>>, _dict: &Self::Dict, _limit: usize, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs index 8473faa9c56f..b4fcd2c38e7d 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs @@ -75,6 +75,7 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { &mut self, decoder: &mut BinaryDecoder, decoded: &mut ::DecodedState, + is_optional: bool, page_validity: &mut Option>, dict: Option<&'a ::Dict>, additional: usize, @@ -84,12 +85,14 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { T::Plain(page_values, _) => decoder.decode_plain_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), additional, )?, T::Dictionary(page_values) => decoder.decode_dictionary_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), dict.unwrap(), additional, @@ -134,14 +137,15 @@ impl Decoder for BinaryDecoder { ) } - fn deserialize_dict(&self, page: DictPage) -> Self::Dict { - page.buffer.into_vec() + fn deserialize_dict(&self, page: DictPage) -> ParquetResult { + Ok(page.buffer.into_vec()) } fn decode_plain_encoded<'a>( &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()> { @@ -180,7 +184,13 @@ impl Decoder for BinaryDecoder { }; match page_validity { - None => collector.push_n(&mut values.values, self.size)?, + None => { + collector.push_n(&mut values.values, limit)?; + + if is_optional { + validity.extend_constant(limit, true); + } + }, Some(page_validity) => extend_from_decoder( validity, page_validity, @@ -197,6 +207,7 @@ impl Decoder for BinaryDecoder { &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut hybrid_rle::HybridRleDecoder<'a>, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, dict: &Self::Dict, limit: usize, @@ -274,6 +285,10 @@ impl Decoder for BinaryDecoder { match page_validity { None => { page_values.gather_n_into(&mut values.values, limit, &gatherer)?; + + if is_optional { + validity.extend_constant(limit, true); + } }, Some(page_validity) => { let collector = GatheredHybridRle::new(page_values, &gatherer, null_value); diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs index 9c79e1083585..a2076014a966 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested.rs @@ -162,7 +162,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::unit(), + primitive::FloatDecoder::::unit(), init, )? .collect_n(filter) @@ -174,7 +174,7 @@ pub fn columns_to_iter_recursive( PageNestedDecoder::new( columns.pop().unwrap(), field.data_type().clone(), - primitive::PrimitiveDecoder::::unit(), + primitive::FloatDecoder::::unit(), init, )? .collect_n(filter) @@ -524,14 +524,14 @@ fn dict_read( Float32 => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), init, )? .collect_n(filter)?, Float64 => PageNestedDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::::unit()), + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), init, )? .collect_n(filter)?, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs index dbf41ee7579c..42e321a2f570 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/nested_utils.rs @@ -742,7 +742,7 @@ impl PageNestedDecoder { init: Vec, ) -> ParquetResult { let dict_page = iter.read_dict_page()?; - let dict = dict_page.map(|d| decoder.deserialize_dict(d)); + let dict = dict_page.map(|d| decoder.deserialize_dict(d)).transpose()?; Ok(Self { iter, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null.rs b/crates/polars-parquet/src/arrow/read/deserialize/null.rs index 5a4c68d0acd7..8066c1d73af3 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null.rs @@ -47,6 +47,7 @@ impl<'a> utils::StateTranslation<'a, NullDecoder> for () { &mut self, _decoder: &mut NullDecoder, decoded: &mut ::DecodedState, + _is_optional: bool, _page_validity: &mut Option>, _: Option<&'a ::Dict>, additional: usize, @@ -67,12 +68,15 @@ impl utils::Decoder for NullDecoder { NullArrayLength { length: 0 } } - fn deserialize_dict(&self, _: DictPage) -> Self::Dict {} + fn deserialize_dict(&self, _: DictPage) -> ParquetResult { + Ok(()) + } fn decode_plain_encoded<'a>( &mut self, _decoded: &mut Self::DecodedState, _page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + _is_optional: bool, _page_validity: Option<&mut utils::PageValidity<'a>>, _limit: usize, ) -> ParquetResult<()> { @@ -83,6 +87,7 @@ impl utils::Decoder for NullDecoder { &mut self, _decoded: &mut Self::DecodedState, _page_values: &mut hybrid_rle::HybridRleDecoder<'a>, + _is_optional: bool, _page_validity: Option<&mut utils::PageValidity<'a>>, _dict: &Self::Dict, _limit: usize, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/float.rs similarity index 63% rename from crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs rename to crates/polars-parquet/src/arrow/read/deserialize/primitive/float.rs index 552f89d9b7c3..1c09ea3f7e87 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/float.rs @@ -4,6 +4,10 @@ use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use super::super::utils; +use super::{ + deserialize_plain, AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, + PlainDecoderFnCollector, PrimitiveDecoder, UnitDecoderFunction, +}; use crate::parquet::encoding::hybrid_rle::DictionaryTranslator; use crate::parquet::encoding::{byte_stream_split, hybrid_rle, Encoding}; use crate::parquet::error::ParquetResult; @@ -15,126 +19,6 @@ use crate::read::deserialize::utils::{ TranslatedHybridRle, }; -/// A function that defines how to decode from the -/// [`parquet::types::NativeType`][ParquetNativeType] to the [`arrow::types::NativeType`]. -/// -/// This should almost always be inlined. -pub(crate) trait DecoderFunction: Copy -where - T: NativeType, - P: ParquetNativeType, -{ - fn decode(self, x: P) -> T; -} - -#[derive(Default, Clone, Copy)] -pub(crate) struct UnitDecoderFunction(std::marker::PhantomData); -impl DecoderFunction for UnitDecoderFunction { - #[inline(always)] - fn decode(self, x: T) -> T { - x - } -} - -#[derive(Default, Clone, Copy)] -pub(crate) struct AsDecoderFunction(std::marker::PhantomData<(P, T)>); -macro_rules! as_decoder_impl { - ($($p:ty => $t:ty,)+) => { - $( - impl DecoderFunction<$p, $t> for AsDecoderFunction<$p, $t> { - #[inline(always)] - fn decode(self, x : $p) -> $t { - x as $t - } - } - )+ - }; -} - -as_decoder_impl![ - i32 => i8, - i32 => i16, - i32 => u8, - i32 => u16, - i32 => u32, - i64 => i32, - i64 => u32, - i64 => u64, -]; - -#[derive(Default, Clone, Copy)] -pub(crate) struct IntoDecoderFunction(std::marker::PhantomData<(P, T)>); -impl DecoderFunction for IntoDecoderFunction -where - P: ParquetNativeType + Into, - T: NativeType, -{ - #[inline(always)] - fn decode(self, x: P) -> T { - x.into() - } -} - -#[derive(Clone, Copy)] -pub(crate) struct ClosureDecoderFunction(F, std::marker::PhantomData<(P, T)>); -impl DecoderFunction for ClosureDecoderFunction -where - P: ParquetNativeType, - T: NativeType, - F: Copy + Fn(P) -> T, -{ - #[inline(always)] - fn decode(self, x: P) -> T { - (self.0)(x) - } -} - -pub(crate) struct PlainDecoderFnCollector<'a, 'b, P, T, D> -where - T: NativeType, - P: ParquetNativeType, - D: DecoderFunction, -{ - pub(crate) chunks: &'b mut ArrayChunks<'a, P>, - pub(crate) decoder: D, - pub(crate) _pd: std::marker::PhantomData, -} - -impl<'a, 'b, P, T, D: DecoderFunction> BatchableCollector<(), Vec> - for PlainDecoderFnCollector<'a, 'b, P, T, D> -where - T: NativeType, - P: ParquetNativeType, - D: DecoderFunction, -{ - fn reserve(target: &mut Vec, n: usize) { - target.reserve(n); - } - - fn push_n(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { - let n = usize::min(self.chunks.len(), n); - let (items, remainder) = self.chunks.bytes.split_at(n); - let decoder = self.decoder; - target.extend( - items - .iter() - .map(|chunk| decoder.decode(P::from_le_bytes(*chunk))), - ); - self.chunks.bytes = remainder; - Ok(()) - } - - fn push_n_nulls(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { - target.resize(target.len() + n, T::default()); - Ok(()) - } - - fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { - self.chunks.skip_in_place(n); - Ok(()) - } -} - #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub(crate) enum StateTranslation<'a, P: ParquetNativeType> { @@ -143,7 +27,7 @@ pub(crate) enum StateTranslation<'a, P: ParquetNativeType> { ByteStreamSplit(byte_stream_split::Decoder<'a>), } -impl<'a, P, T, D> utils::StateTranslation<'a, PrimitiveDecoder> for StateTranslation<'a, P> +impl<'a, P, T, D> utils::StateTranslation<'a, FloatDecoder> for StateTranslation<'a, P> where T: NativeType, P: ParquetNativeType, @@ -152,9 +36,9 @@ where type PlainDecoder = ArrayChunks<'a, P>; fn new( - _decoder: &PrimitiveDecoder, + _decoder: &FloatDecoder, page: &'a DataPage, - dict: Option<&'a as utils::Decoder>::Dict>, + dict: Option<&'a as utils::Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, ) -> ParquetResult { match (page.encoding(), dict) { @@ -202,22 +86,25 @@ where fn extend_from_state( &mut self, - decoder: &mut PrimitiveDecoder, - decoded: &mut as utils::Decoder>::DecodedState, + decoder: &mut FloatDecoder, + decoded: &mut as utils::Decoder>::DecodedState, + is_optional: bool, page_validity: &mut Option>, - dict: Option<&'a as utils::Decoder>::Dict>, + dict: Option<&'a as utils::Decoder>::Dict>, additional: usize, ) -> ParquetResult<()> { match self { Self::Plain(page_values) => decoder.decode_plain_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), additional, )?, Self::Dictionary(ref mut page) => decoder.decode_dictionary_encoded( decoded, page, + is_optional, page_validity.as_mut(), dict.unwrap(), additional, @@ -229,16 +116,20 @@ where None => { values.extend( page_values - .iter_converted(|v| decoder.decoder.decode(decode(v))) + .iter_converted(|v| decoder.0.decoder.decode(decode(v))) .take(additional), ); + + if is_optional { + validity.extend_constant(additional, true); + } }, Some(page_validity) => utils::extend_from_decoder( validity, page_validity, Some(additional), values, - &mut page_values.iter_converted(|v| decoder.decoder.decode(decode(v))), + &mut page_values.iter_converted(|v| decoder.0.decoder.decode(decode(v))), )?, } }, @@ -249,17 +140,13 @@ where } #[derive(Debug)] -pub(crate) struct PrimitiveDecoder +pub(crate) struct FloatDecoder(PrimitiveDecoder) where P: ParquetNativeType, T: NativeType, - D: DecoderFunction, -{ - pub(crate) decoder: D, - _pd: std::marker::PhantomData<(P, T)>, -} + D: DecoderFunction; -impl PrimitiveDecoder +impl FloatDecoder where P: ParquetNativeType, T: NativeType, @@ -267,14 +154,11 @@ where { #[inline] fn new(decoder: D) -> Self { - Self { - decoder, - _pd: std::marker::PhantomData, - } + Self(PrimitiveDecoder::new(decoder)) } } -impl PrimitiveDecoder> +impl FloatDecoder> where T: NativeType + ParquetNativeType, UnitDecoderFunction: Default + DecoderFunction, @@ -284,7 +168,7 @@ where } } -impl PrimitiveDecoder> +impl FloatDecoder> where P: ParquetNativeType, T: NativeType, @@ -295,18 +179,7 @@ where } } -impl PrimitiveDecoder> -where - P: ParquetNativeType, - T: NativeType, - IntoDecoderFunction: Default + DecoderFunction, -{ - pub(crate) fn cast_into() -> Self { - Self::new(IntoDecoderFunction::::default()) - } -} - -impl PrimitiveDecoder> +impl FloatDecoder> where P: ParquetNativeType, T: NativeType, @@ -323,7 +196,7 @@ impl utils::ExactSize for (Vec, MutableBitmap) { } } -impl utils::Decoder for PrimitiveDecoder +impl utils::Decoder for FloatDecoder where T: NativeType, P: ParquetNativeType, @@ -341,14 +214,15 @@ where ) } - fn deserialize_dict(&self, page: DictPage) -> Self::Dict { - deserialize_plain::(&page.buffer, self.decoder) + fn deserialize_dict(&self, page: DictPage) -> ParquetResult { + Ok(deserialize_plain::(&page.buffer, self.0.decoder)) } fn decode_plain_encoded<'a>( &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()> { @@ -356,15 +230,19 @@ where None => { PlainDecoderFnCollector { chunks: page_values, - decoder: self.decoder, + decoder: self.0.decoder, _pd: std::marker::PhantomData, } .push_n(values, limit)?; + + if is_optional { + validity.extend_constant(limit, true); + } }, Some(page_validity) => { let collector = PlainDecoderFnCollector { chunks: page_values, - decoder: self.decoder, + decoder: self.0.decoder, _pd: std::marker::PhantomData, }; @@ -385,6 +263,7 @@ where &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut hybrid_rle::HybridRleDecoder<'a>, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, dict: &Self::Dict, limit: usize, @@ -394,6 +273,10 @@ where match page_validity { None => { page_values.translate_and_collect_n_into(values, limit, &translator)?; + + if is_optional { + validity.extend_constant(limit, true); + } }, Some(page_validity) => { let translated_hybridrle = TranslatedHybridRle::new(page_values, &translator); @@ -422,7 +305,7 @@ where } } -impl utils::DictDecodable for PrimitiveDecoder +impl utils::DictDecodable for FloatDecoder where T: NativeType, P: ParquetNativeType, @@ -445,7 +328,7 @@ where } } -impl utils::NestedDecoder for PrimitiveDecoder +impl utils::NestedDecoder for FloatDecoder where T: NativeType, P: ParquetNativeType, @@ -468,16 +351,3 @@ where values.resize(values.len() + n, T::default()); } } - -pub(super) fn deserialize_plain(values: &[u8], decoder: D) -> Vec -where - T: NativeType, - P: ParquetNativeType, - D: DecoderFunction, -{ - values - .chunks_exact(std::mem::size_of::

()) - .map(decode) - .map(|v| decoder.decode(v)) - .collect::>() -} diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index bc729cb0ea9e..dfe3f2f09cd6 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -4,11 +4,11 @@ use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use super::super::utils; -use super::basic::{ - AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, IntoDecoderFunction, - PlainDecoderFnCollector, PrimitiveDecoder, UnitDecoderFunction, +use super::{ + deserialize_plain, AsDecoderFunction, ClosureDecoderFunction, DecoderFunction, DeltaCollector, + DeltaTranslator, IntoDecoderFunction, PlainDecoderFnCollector, PrimitiveDecoder, + UnitDecoderFunction, }; -use super::{DeltaCollector, DeltaTranslator}; use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator}; use crate::parquet::encoding::{byte_stream_split, delta_bitpacked, Encoding}; use crate::parquet::error::ParquetResult; @@ -99,6 +99,7 @@ where &mut self, decoder: &mut IntDecoder, decoded: &mut as utils::Decoder>::DecodedState, + is_optional: bool, page_validity: &mut Option>, dict: Option<&'a as utils::Decoder>::Dict>, additional: usize, @@ -107,12 +108,14 @@ where Self::Plain(page_values) => decoder.decode_plain_encoded( decoded, page_values, + is_optional, page_validity.as_mut(), additional, )?, Self::Dictionary(ref mut page) => decoder.decode_dictionary_encoded( decoded, page, + is_optional, page_validity.as_mut(), dict.unwrap(), additional, @@ -127,6 +130,10 @@ where .iter_converted(|v| decoder.0.decoder.decode(decode(v))) .take(additional), ); + + if is_optional { + validity.extend_constant(additional, true); + } }, Some(page_validity) => { utils::extend_from_decoder( @@ -149,7 +156,13 @@ where }; match page_validity { - None => page_values.gather_n_into(values, additional, &mut gatherer)?, + None => { + page_values.gather_n_into(values, additional, &mut gatherer)?; + + if is_optional { + validity.extend_constant(additional, true); + } + }, Some(page_validity) => utils::extend_from_decoder( validity, page_validity, @@ -185,8 +198,8 @@ where D: DecoderFunction, { #[inline] - fn new(decoder: PrimitiveDecoder) -> Self { - Self(decoder) + fn new(decoder: D) -> Self { + Self(PrimitiveDecoder::new(decoder)) } } @@ -197,7 +210,7 @@ where UnitDecoderFunction: Default + DecoderFunction, { pub(crate) fn unit() -> Self { - Self::new(PrimitiveDecoder::unit()) + Self::new(UnitDecoderFunction::::default()) } } @@ -209,7 +222,7 @@ where AsDecoderFunction: Default + DecoderFunction, { pub(crate) fn cast_as() -> Self { - Self::new(PrimitiveDecoder::cast_as()) + Self::new(AsDecoderFunction::::default()) } } @@ -221,7 +234,7 @@ where IntoDecoderFunction: Default + DecoderFunction, { pub(crate) fn cast_into() -> Self { - Self::new(PrimitiveDecoder::cast_into()) + Self::new(IntoDecoderFunction::::default()) } } @@ -233,7 +246,7 @@ where F: Copy + Fn(P) -> T, { pub(crate) fn closure(f: F) -> Self { - Self::new(PrimitiveDecoder::closure(f)) + Self::new(ClosureDecoderFunction(f, std::marker::PhantomData)) } } @@ -250,17 +263,21 @@ where type Output = PrimitiveArray; fn with_capacity(&self, capacity: usize) -> Self::DecodedState { - self.0.with_capacity(capacity) + ( + Vec::::with_capacity(capacity), + MutableBitmap::with_capacity(capacity), + ) } - fn deserialize_dict(&self, page: DictPage) -> Self::Dict { - self.0.deserialize_dict(page) + fn deserialize_dict(&self, page: DictPage) -> ParquetResult { + Ok(deserialize_plain::(&page.buffer, self.0.decoder)) } fn decode_plain_encoded<'a>( &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut as utils::StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()> { @@ -272,6 +289,10 @@ where _pd: Default::default(), } .push_n(values, limit)?; + + if is_optional { + validity.extend_constant(limit, true); + } }, Some(page_validity) => { let collector = PlainDecoderFnCollector { @@ -297,11 +318,20 @@ where &mut self, (values, validity): &mut Self::DecodedState, page_values: &mut hybrid_rle::HybridRleDecoder<'a>, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, dict: &Self::Dict, limit: usize, ) -> ParquetResult<()> { match page_validity { + None => { + let translator = DictionaryTranslator(dict); + page_values.translate_and_collect_n_into(values, limit, &translator)?; + + if is_optional { + validity.extend_constant(limit, true); + } + }, Some(page_validity) => { let translator = DictionaryTranslator(dict); let translated_hybridrle = TranslatedHybridRle::new(page_values, &translator); @@ -314,10 +344,6 @@ where translated_hybridrle, )?; }, - None => { - let translator = DictionaryTranslator(dict); - page_values.translate_and_collect_n_into(values, limit, &translator)?; - }, } Ok(()) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs index 22da6ff14895..1a9d50a66d31 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/mod.rs @@ -1,19 +1,178 @@ use arrow::types::NativeType; use num_traits::AsPrimitive; -use crate::parquet::types::NativeType as ParquetNativeType; +use crate::parquet::types::{decode, NativeType as ParquetNativeType}; -mod basic; +mod float; mod integer; -pub(crate) use basic::PrimitiveDecoder; +pub(crate) use float::FloatDecoder; pub(crate) use integer::IntDecoder; -use self::basic::DecoderFunction; +use super::utils::array_chunks::ArrayChunks; use super::utils::BatchableCollector; use super::ParquetResult; use crate::parquet::encoding::delta_bitpacked::{self, DeltaGatherer}; +#[derive(Debug)] +pub(crate) struct PrimitiveDecoder +where + P: ParquetNativeType, + T: NativeType, + D: DecoderFunction, +{ + pub(crate) decoder: D, + _pd: std::marker::PhantomData<(P, T)>, +} + +impl PrimitiveDecoder +where + P: ParquetNativeType, + T: NativeType, + D: DecoderFunction, +{ + #[inline] + pub(crate) fn new(decoder: D) -> Self { + Self { + decoder, + _pd: std::marker::PhantomData, + } + } +} + +/// A function that defines how to decode from the +/// [`parquet::types::NativeType`][ParquetNativeType] to the [`arrow::types::NativeType`]. +/// +/// This should almost always be inlined. +pub(crate) trait DecoderFunction: Copy +where + T: NativeType, + P: ParquetNativeType, +{ + fn decode(self, x: P) -> T; +} + +#[derive(Default, Clone, Copy)] +pub(crate) struct UnitDecoderFunction(std::marker::PhantomData); +impl DecoderFunction for UnitDecoderFunction { + #[inline(always)] + fn decode(self, x: T) -> T { + x + } +} + +#[derive(Default, Clone, Copy)] +pub(crate) struct AsDecoderFunction(std::marker::PhantomData<(P, T)>); +macro_rules! as_decoder_impl { + ($($p:ty => $t:ty,)+) => { + $( + impl DecoderFunction<$p, $t> for AsDecoderFunction<$p, $t> { + #[inline(always)] + fn decode(self, x : $p) -> $t { + x as $t + } + } + )+ + }; +} + +as_decoder_impl![ + i32 => i8, + i32 => i16, + i32 => u8, + i32 => u16, + i32 => u32, + i64 => i32, + i64 => u32, + i64 => u64, +]; + +#[derive(Default, Clone, Copy)] +pub(crate) struct IntoDecoderFunction(std::marker::PhantomData<(P, T)>); +impl DecoderFunction for IntoDecoderFunction +where + P: ParquetNativeType + Into, + T: NativeType, +{ + #[inline(always)] + fn decode(self, x: P) -> T { + x.into() + } +} + +#[derive(Clone, Copy)] +pub(crate) struct ClosureDecoderFunction(F, std::marker::PhantomData<(P, T)>); +impl DecoderFunction for ClosureDecoderFunction +where + P: ParquetNativeType, + T: NativeType, + F: Copy + Fn(P) -> T, +{ + #[inline(always)] + fn decode(self, x: P) -> T { + (self.0)(x) + } +} + +pub(crate) struct PlainDecoderFnCollector<'a, 'b, P, T, D> +where + T: NativeType, + P: ParquetNativeType, + D: DecoderFunction, +{ + pub(crate) chunks: &'b mut ArrayChunks<'a, P>, + pub(crate) decoder: D, + pub(crate) _pd: std::marker::PhantomData, +} + +impl<'a, 'b, P, T, D: DecoderFunction> BatchableCollector<(), Vec> + for PlainDecoderFnCollector<'a, 'b, P, T, D> +where + T: NativeType, + P: ParquetNativeType, + D: DecoderFunction, +{ + fn reserve(target: &mut Vec, n: usize) { + target.reserve(n); + } + + fn push_n(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + let n = usize::min(self.chunks.len(), n); + let (items, remainder) = self.chunks.bytes.split_at(n); + let decoder = self.decoder; + target.extend( + items + .iter() + .map(|chunk| decoder.decode(P::from_le_bytes(*chunk))), + ); + self.chunks.bytes = remainder; + Ok(()) + } + + fn push_n_nulls(&mut self, target: &mut Vec, n: usize) -> ParquetResult<()> { + target.resize(target.len() + n, T::default()); + Ok(()) + } + + fn skip_in_place(&mut self, n: usize) -> ParquetResult<()> { + self.chunks.skip_in_place(n); + Ok(()) + } +} + +fn deserialize_plain(values: &[u8], decoder: D) -> Vec +where + T: NativeType, + P: ParquetNativeType, + D: DecoderFunction, +{ + values + .chunks_exact(std::mem::size_of::

()) + .map(decode) + .map(|v| decoder.decode(v)) + .collect::>() +} + struct DeltaTranslator where T: NativeType, diff --git a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs index ba4a15814006..9d512d834ebf 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/simple.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/simple.rs @@ -278,13 +278,13 @@ pub fn page_iter_to_array( (PhysicalType::Float, Float32) => Box::new(PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::::unit(), + primitive::FloatDecoder::::unit(), )? .collect_n(filter)?), (PhysicalType::Double, Float64) => Box::new(PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::::unit(), + primitive::FloatDecoder::::unit(), )? .collect_n(filter)?), // Don't compile this code with `i32` as we don't use this in polars @@ -393,7 +393,7 @@ fn timestamp( PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)), + primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ns(x)), )? .collect_n(filter)?, )), @@ -401,7 +401,7 @@ fn timestamp( PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)), + primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_us(x)), )? .collect_n(filter)?, )), @@ -409,7 +409,7 @@ fn timestamp( PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)), + primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_ms(x)), )? .collect_n(filter)?, )), @@ -417,7 +417,7 @@ fn timestamp( PageDecoder::new( pages, data_type, - primitive::PrimitiveDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)), + primitive::FloatDecoder::closure(|x: [u32; 3]| int96_to_i64_s(x)), )? .collect_n(filter)?, )), @@ -473,7 +473,7 @@ fn timestamp_dict( (a, true) => PageDecoder::new( pages, ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - dictionary::DictionaryDecoder::::new(primitive::PrimitiveDecoder::closure( + dictionary::DictionaryDecoder::::new(primitive::FloatDecoder::closure( |x: [u32; 3]| int96_to_i64_ns(x) * a, )), )? @@ -481,7 +481,7 @@ fn timestamp_dict( (a, false) => PageDecoder::new( pages, ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), - dictionary::DictionaryDecoder::::new(primitive::PrimitiveDecoder::closure( + dictionary::DictionaryDecoder::::new(primitive::FloatDecoder::closure( |x: [u32; 3]| int96_to_i64_ns(x) / a, )), )? @@ -494,17 +494,13 @@ fn timestamp_dict( (a, true) => PageDecoder::new( pages, data_type, - dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::closure(|x: i64| { - x * a - })), + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::closure(|x: i64| x * a)), )? .collect_n(filter), (a, false) => PageDecoder::new( pages, data_type, - dictionary::DictionaryDecoder::new(primitive::PrimitiveDecoder::closure(|x: i64| { - x / a - })), + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::closure(|x: i64| x / a)), )? .collect_n(filter), } @@ -524,132 +520,99 @@ fn dict_read( panic!() }; - Ok( - match (physical_type, values_data_type.to_logical_type()) { - (PhysicalType::Int32, UInt8) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - )? - .collect_n(filter)?, - (PhysicalType::Int32, UInt16) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - )? - .collect_n(filter)?, - (PhysicalType::Int32, UInt32) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - )? - .collect_n(filter)?, - (PhysicalType::Int64, UInt64) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - )? - .collect_n(filter)?, - (PhysicalType::Int32, Int8) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), - )? - .collect_n(filter)?, - (PhysicalType::Int32, Int16) => PageDecoder::new( + Ok(match (physical_type, values_data_type.to_logical_type()) { + (PhysicalType::Int32, UInt8) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int32, UInt16) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int32, UInt32) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int64, UInt64) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int32, Int8) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int32, Int16) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::cast_as()), + )? + .collect_n(filter)?, + (PhysicalType::Int32, Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth)) => { + PageDecoder::new( iter, data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::cast_as(), - ), + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), )? - .collect_n(filter)?, - ( - PhysicalType::Int32, - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth), - ) => { - PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::unit(), - ), - )? - .collect_n(filter)? - }, - - (PhysicalType::Int64, Timestamp(time_unit, _)) => { - let time_unit = *time_unit; - return timestamp_dict::( - iter, - physical_type, - logical_type, - data_type, - filter, - time_unit, - ); - }, + .collect_n(filter)? + }, - (PhysicalType::Int64, Int64 | Date64 | Time64(_) | Duration(_)) => { - PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::unit(), - ), - )? - .collect_n(filter)? - }, - (PhysicalType::Float, Float32) => { - PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::unit(), - ), - )? - .collect_n(filter)? - }, - (PhysicalType::Double, Float64) => { - PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new( - primitive::PrimitiveDecoder::::unit(), - ), - )? - .collect_n(filter)? - }, - (_, LargeUtf8 | LargeBinary | Utf8 | Binary) => unreachable!(), - (PhysicalType::ByteArray, Utf8View | BinaryView) => PageDecoder::new( - iter, - data_type, - dictionary::DictionaryDecoder::new(BinViewDecoder::default()), - )? - .collect_n(filter)?, - (PhysicalType::FixedLenByteArray(size), FixedSizeBinary(_)) => PageDecoder::new( + (PhysicalType::Int64, Timestamp(time_unit, _)) => { + let time_unit = *time_unit; + return timestamp_dict::( iter, + physical_type, + logical_type, data_type, - dictionary::DictionaryDecoder::new(fixed_size_binary::BinaryDecoder { - size: *size, - }), - )? - .collect_n(filter)?, - other => { - return Err(ParquetError::FeatureNotSupported(format!( - "Reading dictionaries of type {other:?}" - ))); - }, + filter, + time_unit, + ); }, - ) + + (PhysicalType::Int64, Int64 | Date64 | Time64(_) | Duration(_)) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), + )? + .collect_n(filter)?, + (PhysicalType::Float, Float32) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), + )? + .collect_n(filter)?, + (PhysicalType::Double, Float64) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(primitive::FloatDecoder::::unit()), + )? + .collect_n(filter)?, + (_, LargeUtf8 | LargeBinary | Utf8 | Binary) => unreachable!(), + (PhysicalType::ByteArray, Utf8View | BinaryView) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(BinViewDecoder::default()), + )? + .collect_n(filter)?, + (PhysicalType::FixedLenByteArray(size), FixedSizeBinary(_)) => PageDecoder::new( + iter, + data_type, + dictionary::DictionaryDecoder::new(fixed_size_binary::BinaryDecoder { size: *size }), + )? + .collect_n(filter)?, + other => { + return Err(ParquetError::FeatureNotSupported(format!( + "Reading dictionaries of type {other:?}" + ))); + }, + }) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index 7a18a0c16a85..c1dc1324bb27 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -19,6 +19,7 @@ use crate::parquet::schema::Repetition; #[derive(Debug)] pub(crate) struct State<'a, D: Decoder> { pub(crate) dict: Option<&'a D::Dict>, + pub(crate) is_optional: bool, pub(crate) page_validity: Option>, pub(crate) translation: D::Translation<'a>, } @@ -41,6 +42,7 @@ pub(crate) trait StateTranslation<'a, D: Decoder>: Sized { &mut self, decoder: &mut D, decoded: &mut D::DecodedState, + is_optional: bool, page_validity: &mut Option>, dict: Option<&'a D::Dict>, additional: usize, @@ -52,14 +54,25 @@ impl<'a, D: Decoder> State<'a, D> { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - let page_validity = is_optional + let mut page_validity = is_optional .then(|| page_validity_decoder(page)) .transpose()?; + // Make the page_validity None if there are no nulls in the page + let null_count = page + .null_count() + .map(Ok) + .or_else(|| page_validity.as_ref().map(hybrid_rle_count_zeros)) + .transpose()?; + if null_count == Some(0) { + page_validity = None; + } + let translation = D::Translation::new(decoder, page, dict, page_validity.as_ref())?; Ok(Self { dict, + is_optional, page_validity, translation, }) @@ -75,6 +88,9 @@ impl<'a, D: Decoder> State<'a, D> { Ok(Self { dict, translation, + + // Nested values may be optional, but all that is handled elsewhere. + is_optional: false, page_validity: None, }) } @@ -120,6 +136,7 @@ impl<'a, D: Decoder> State<'a, D> { self.translation.extend_from_state( decoder, decoded, + self.is_optional, &mut self.page_validity, self.dict, num_rows, @@ -137,6 +154,7 @@ impl<'a, D: Decoder> State<'a, D> { self.translation.extend_from_state( decoder, decoded, + self.is_optional, &mut self.page_validity, self.dict, end - start, @@ -158,6 +176,7 @@ impl<'a, D: Decoder> State<'a, D> { self.translation.extend_from_state( decoder, decoded, + self.is_optional, &mut self.page_validity, self.dict, num_ones, @@ -586,7 +605,7 @@ pub(super) trait Decoder: Sized { fn with_capacity(&self, capacity: usize) -> Self::DecodedState; /// Deserializes a [`DictPage`] into [`Self::Dict`]. - fn deserialize_dict(&self, page: DictPage) -> Self::Dict; + fn deserialize_dict(&self, page: DictPage) -> ParquetResult; fn apply_dictionary( &mut self, @@ -600,6 +619,7 @@ pub(super) trait Decoder: Sized { &mut self, decoded: &mut Self::DecodedState, page_values: &mut as StateTranslation<'a, Self>>::PlainDecoder, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, limit: usize, ) -> ParquetResult<()>; @@ -607,6 +627,7 @@ pub(super) trait Decoder: Sized { &mut self, decoded: &mut Self::DecodedState, page_values: &mut HybridRleDecoder<'a>, + is_optional: bool, page_validity: Option<&mut PageValidity<'a>>, dict: &Self::Dict, limit: usize, @@ -675,7 +696,7 @@ impl PageDecoder { decoder: D, ) -> ParquetResult { let dict_page = iter.read_dict_page()?; - let dict = dict_page.map(|d| decoder.deserialize_dict(d)); + let dict = dict_page.map(|d| decoder.deserialize_dict(d)).transpose()?; Ok(Self { iter, diff --git a/crates/polars-parquet/src/parquet/page/mod.rs b/crates/polars-parquet/src/parquet/page/mod.rs index 128f1af03c14..eb4dc4b5955d 100644 --- a/crates/polars-parquet/src/parquet/page/mod.rs +++ b/crates/polars-parquet/src/parquet/page/mod.rs @@ -123,6 +123,13 @@ impl DataPageHeader { DataPageHeader::V2(d) => d.num_values as usize, } } + + pub fn null_count(&self) -> Option { + match &self { + DataPageHeader::V1(_) => None, + DataPageHeader::V2(d) => Some(d.num_nulls as usize), + } + } } /// A [`DataPage`] is an uncompressed, encoded representation of a Parquet data page. It holds actual data @@ -181,6 +188,10 @@ impl DataPage { self.header.num_values() } + pub fn null_count(&self) -> Option { + self.header.null_count() + } + pub fn num_rows(&self) -> Option { self.num_rows } diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index c61b07ce88c4..2fd7d2ab4193 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1715,6 +1715,35 @@ def test_parametric_small_page_mask_filtering( assert_frame_equal(result, df.filter(expr)) +@pytest.mark.parametrize( + "value", + [ + "abcd", + 0, + 0.0, + False, + ], +) +def test_different_page_validity_across_pages(value: str | int | float | bool) -> None: + df = pl.DataFrame( + { + "a": [None] + [value] * 4000, + } + ) + + f = io.BytesIO() + pq.write_table( + df.to_arrow(), + f, + use_dictionary=False, + data_page_size=1024, + column_encoding={"a": "PLAIN"}, + ) + + f.seek(0) + assert_frame_equal(df, pl.read_parquet(f)) + + @given( df=dataframes( min_size=0,