diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index a65c7585327e..97611d0ec300 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( value: Some(encoded), }; - let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + let meta = props + .key_value_metadata + .get_or_insert_with(Default::default); + // check if ARROW:schema exists, and overwrite it let schema_meta = meta .iter() @@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( meta.push(schema_kv); } } - props.key_value_metadata = Some(meta); } /// Convert arrow schema to parquet schema diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d589aef5a507..5def721353a1 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -17,7 +17,7 @@ //! Contains column writer API. use parquet_format::{ColumnIndex, OffsetIndex}; -use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData}; +use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData}; use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; @@ -31,12 +31,13 @@ use crate::encodings::{ }; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; +use crate::file::properties::EnabledStatistics; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; -use crate::schema::types::ColumnDescPtr; +use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; use crate::util::bit_util::FromBytes; use crate::util::memory::ByteBufferPtr; @@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> { // Column writer properties descr: ColumnDescPtr, props: WriterPropertiesPtr, + statistics_enabled: EnabledStatistics, + page_writer: Box, has_dictionary: bool, dict_encoder: Option>, @@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) .unwrap(); + let statistics_enabled = props.statistics_enabled(descr.path()); + Self { descr, props, + statistics_enabled, page_writer, has_dictionary, dict_encoder, @@ -302,53 +308,48 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + // If only computing chunk-level statistics compute them here, page-level statistics + // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in + // [`Self::add_data_page`] + if self.statistics_enabled == EnabledStatistics::Chunk { + match (min, max) { + (Some(min), Some(max)) => { + Self::update_min(&self.descr, min, &mut self.min_column_value); + Self::update_max(&self.descr, max, &mut self.max_column_value); } - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(max, v)) - { - self.max_column_value = Some(max.clone()); + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") } - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => {} + (None, None) => { + for val in values { + Self::update_min(&self.descr, val, &mut self.min_column_value); + Self::update_max(&self.descr, val, &mut self.max_column_value); + } + } + }; } - if let Some(distinct) = distinct_count { - self.column_distinct_count = - Some(self.column_distinct_count.unwrap_or(0) + distinct); + // We can only set the distinct count if there are no other writes + if self.num_buffered_values == 0 && self.num_page_nulls == 0 { + self.column_distinct_count = distinct_count; + } else { + self.column_distinct_count = None; } if let Some(nulls) = null_count { self.num_column_nulls += nulls; } - let calculate_page_stats = (min.is_none() || max.is_none()) - && null_count.is_none() - && distinct_count.is_none(); - let mut values_offset = 0; let mut levels_offset = 0; for _ in 0..num_batches { @@ -356,7 +357,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..values_offset + write_batch_size], def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), - calculate_page_stats, )?; levels_offset += write_batch_size; } @@ -365,7 +365,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..], def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), - calculate_page_stats, )?; // Return total number of values processed. @@ -393,9 +392,13 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None) } - /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do - /// not calculate page level statistics as this will defeat the purpose of speeding up the write - /// process with pre-calculated statistics. + /// Writer may optionally provide pre-calculated statistics for use when computing + /// chunk-level statistics + /// + /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`] + /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored, + /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the + /// computed page statistics pub fn write_batch_with_statistics( &mut self, values: &[T::T], @@ -466,7 +469,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { values: &[T::T], def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, - calculate_page_stats: bool, ) -> Result { let mut values_to_write = 0; @@ -494,7 +496,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { for &level in levels { if level == self.descr.max_def_level() { values_to_write += 1; - } else if calculate_page_stats { + } else if self.statistics_enabled == EnabledStatistics::Page { self.num_page_nulls += 1 } } @@ -537,7 +539,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) })?; - if calculate_page_stats { + if self.statistics_enabled == EnabledStatistics::Page { for val in values_to_write { self.update_page_min_max(val); } @@ -549,7 +551,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap(); if self.should_add_data_page() { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } if self.should_dict_fallback() { @@ -661,7 +663,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. - fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { + fn add_data_page(&mut self) -> Result<()> { // Extract encoded values let value_bytes = match self.dict_encoder { Some(ref mut encoder) => encoder.write_indices()?, @@ -678,14 +680,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let max_def_level = self.descr.max_def_level(); let max_rep_level = self.descr.max_rep_level(); - // always update column NULL count, no matter if page stats are used self.num_column_nulls += self.num_page_nulls; - let page_statistics = if calculate_page_stat { - self.update_column_min_max(); - Some(self.make_page_statistics()) - } else { - None + let has_min_max = self.min_page_value.is_some() && self.max_page_value.is_some(); + let page_statistics = match self.statistics_enabled { + EnabledStatistics::Page if has_min_max => { + self.update_column_min_max(); + Some(self.make_page_statistics()) + } + _ => None, }; // update column and offset index @@ -810,10 +813,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn flush_data_pages(&mut self) -> Result<()> { // Write all outstanding data to a new page. - let calculate_page_stats = - self.min_page_value.is_some() && self.max_page_value.is_some(); if self.num_buffered_values > 0 { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } while let Some(page) = self.data_pages.pop_front() { @@ -1029,8 +1030,36 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - #[allow(clippy::eq_op)] fn update_page_min_max(&mut self, val: &T::T) { + Self::update_min(&self.descr, val, &mut self.min_page_value); + Self::update_max(&self.descr, val, &mut self.max_page_value); + } + + fn update_column_min_max(&mut self) { + let min = self.min_page_value.as_ref().unwrap(); + Self::update_min(&self.descr, min, &mut self.min_column_value); + + let max = self.max_page_value.as_ref().unwrap(); + Self::update_max(&self.descr, max, &mut self.max_column_value); + } + + fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut Option) { + Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, val)) + } + + fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut Option) { + Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, cur)) + } + + /// Perform a conditional update of `cur`, skipping any NaN values + /// + /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with + /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` + #[allow(clippy::eq_op)] + fn update_stat(val: &T::T, cur: &mut Option, should_update: F) + where + F: Fn(&T::T) -> bool, + { if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() { // Skip NaN values if val != val { @@ -1038,50 +1067,21 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - if self - .min_page_value - .as_ref() - .map_or(true, |min| self.compare_greater(min, val)) - { - self.min_page_value = Some(val.clone()); - } - if self - .max_page_value - .as_ref() - .map_or(true, |max| self.compare_greater(val, max)) - { - self.max_page_value = Some(val.clone()); - } - } - - fn update_column_min_max(&mut self) { - let update_min = self.min_column_value.as_ref().map_or(true, |min| { - let page_value = self.min_page_value.as_ref().unwrap(); - self.compare_greater(min, page_value) - }); - if update_min { - self.min_column_value = self.min_page_value.clone(); - } - - let update_max = self.max_column_value.as_ref().map_or(true, |max| { - let page_value = self.max_page_value.as_ref().unwrap(); - self.compare_greater(page_value, max) - }); - if update_max { - self.max_column_value = self.max_page_value.clone(); + if cur.as_ref().map_or(true, should_update) { + *cur = Some(val.clone()); } } /// Evaluate `a > b` according to underlying logical type. - fn compare_greater(&self, a: &T::T, b: &T::T) -> bool { - if let Some(LogicalType::Integer { is_signed, .. }) = self.descr.logical_type() { + fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool { + if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { if !is_signed { // need to compare unsigned return a.as_u64().unwrap() > b.as_u64().unwrap(); } } - match self.descr.converted_type() { + match descr.converted_type() { ConvertedType::UINT_8 | ConvertedType::UINT_16 | ConvertedType::UINT_32 @@ -1091,8 +1091,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { _ => {} }; - if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() { - match self.descr.physical_type() { + if let Some(LogicalType::Decimal { .. }) = descr.logical_type() { + match T::get_physical_type() { Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { return compare_greater_byte_array_decimals( a.as_bytes(), @@ -1103,8 +1103,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { }; } - if self.descr.converted_type() == ConvertedType::DECIMAL { - match self.descr.physical_type() { + if descr.converted_type() == ConvertedType::DECIMAL { + match T::get_physical_type() { Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { return compare_greater_byte_array_decimals( a.as_bytes(), @@ -1713,7 +1713,11 @@ mod tests { #[test] fn test_column_writer_precalculated_statistics() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(), + ); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer .write_batch_with_statistics( @@ -1754,6 +1758,56 @@ mod tests { } } + #[test] + fn test_mixed_precomputed_statistics() { + let mut buf = Vec::with_capacity(100); + let mut write = TrackedWrite::new(&mut buf); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + writer + .write_batch_with_statistics( + &[5, 6, 7], + None, + None, + Some(&5), + Some(&7), + Some(0), + Some(3), + ) + .unwrap(); + + let (_, _, metadata, _, _) = writer.close().unwrap(); + + let stats = metadata.statistics().unwrap(); + assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(stats.null_count(), 0); + assert!(stats.distinct_count().is_none()); + + let reader = SerializedPageReader::new( + std::io::Cursor::new(buf), + 7, + Compression::UNCOMPRESSED, + Type::INT32, + ) + .unwrap(); + + let pages = reader.collect::>>().unwrap(); + assert_eq!(pages.len(), 2); + + assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); + assert_eq!(pages[1].page_type(), PageType::DATA_PAGE); + + let page_statistics = pages[1].statistics().unwrap(); + assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(page_statistics.null_count(), 0); + assert!(page_statistics.distinct_count().is_none()); + } + #[test] fn test_column_writer_empty_column_roundtrip() { let props = WriterProperties::builder().build(); @@ -2279,10 +2333,10 @@ mod tests { let mut max_batch_size = values.len(); if let Some(levels) = def_levels { - max_batch_size = cmp::max(max_batch_size, levels.len()); + max_batch_size = max_batch_size.max(levels.len()); } if let Some(levels) = rep_levels { - max_batch_size = cmp::max(max_batch_size, levels.len()); + max_batch_size = max_batch_size.max(levels.len()); } let mut writer = get_test_column_writer::( diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 2baf93933cbc..9ca7c4daa597 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -60,7 +60,7 @@ const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0; const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED; const DEFAULT_DICTIONARY_ENABLED: bool = true; const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE; -const DEFAULT_STATISTICS_ENABLED: bool = true; +const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); @@ -198,7 +198,7 @@ impl WriterProperties { } /// Returns `true` if statistics are enabled for a column. - pub fn statistics_enabled(&self, col: &ColumnPath) -> bool { + pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics { self.column_properties .get(col) .and_then(|c| c.statistics_enabled()) @@ -339,7 +339,7 @@ impl WriterPropertiesBuilder { } /// Sets flag to enable/disable statistics for any column. - pub fn set_statistics_enabled(mut self, value: bool) -> Self { + pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self { self.default_column_properties.set_statistics_enabled(value); self } @@ -394,7 +394,11 @@ impl WriterPropertiesBuilder { /// Sets flag to enable/disable statistics for a column. /// Takes precedence over globally defined settings. - pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self { + pub fn set_column_statistics_enabled( + mut self, + col: ColumnPath, + value: EnabledStatistics, + ) -> Self { self.get_mut_props(col).set_statistics_enabled(value); self } @@ -411,6 +415,23 @@ impl WriterPropertiesBuilder { } } +/// Controls the level of statistics to be computed by the writer +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EnabledStatistics { + /// Compute no statistics + None, + /// Compute chunk-level statistics but not page-level + Chunk, + /// Compute page-level and chunk-level statistics + Page, +} + +impl Default for EnabledStatistics { + fn default() -> Self { + DEFAULT_STATISTICS_ENABLED + } +} + /// Container for column properties that can be changed as part of writer. /// /// If a field is `None`, it means that no specific value has been set for this column, @@ -420,7 +441,7 @@ struct ColumnProperties { encoding: Option, codec: Option, dictionary_enabled: Option, - statistics_enabled: Option, + statistics_enabled: Option, max_statistics_size: Option, } @@ -463,7 +484,7 @@ impl ColumnProperties { } /// Sets whether or not statistics are enabled for this column. - fn set_statistics_enabled(&mut self, enabled: bool) { + fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) { self.statistics_enabled = Some(enabled); } @@ -491,7 +512,7 @@ impl ColumnProperties { /// Returns `Some(true)` if statistics are enabled for this column, if disabled then /// returns `Some(false)`. If result is `None`, then no setting has been provided. - fn statistics_enabled(&self) -> Option { + fn statistics_enabled(&self) -> Option { self.statistics_enabled } @@ -613,13 +634,16 @@ mod tests { .set_encoding(Encoding::DELTA_BINARY_PACKED) .set_compression(Compression::GZIP) .set_dictionary_enabled(false) - .set_statistics_enabled(false) + .set_statistics_enabled(EnabledStatistics::None) .set_max_statistics_size(50) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) .set_column_dictionary_enabled(ColumnPath::from("col"), true) - .set_column_statistics_enabled(ColumnPath::from("col"), true) + .set_column_statistics_enabled( + ColumnPath::from("col"), + EnabledStatistics::Chunk, + ) .set_column_max_statistics_size(ColumnPath::from("col"), 123) .build(); @@ -642,7 +666,10 @@ mod tests { ); assert_eq!(props.compression(&ColumnPath::from("a")), Compression::GZIP); assert!(!props.dictionary_enabled(&ColumnPath::from("a"))); - assert!(!props.statistics_enabled(&ColumnPath::from("a"))); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("a")), + EnabledStatistics::None + ); assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50); assert_eq!( @@ -654,7 +681,10 @@ mod tests { Compression::SNAPPY ); assert!(props.dictionary_enabled(&ColumnPath::from("col"))); - assert!(props.statistics_enabled(&ColumnPath::from("col"))); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("col")), + EnabledStatistics::Chunk + ); assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); }