diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 8b4641d68141..b64d1bfbfaaf 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -416,7 +416,7 @@ mod tests { FixedLenByteArrayType, Int32Type, Int64Type, }; use crate::errors::Result; - use crate::file::properties::{WriterProperties, WriterVersion}; + use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion}; use crate::file::reader::{FileReader, SerializedFileReader}; use crate::file::writer::SerializedFileWriter; use crate::schema::parser::parse_message_type; @@ -787,6 +787,8 @@ mod tests { max_dict_page_size: usize, /// Writer version writer_version: WriterVersion, + /// Enabled statistics + enabled_statistics: EnabledStatistics, /// Encoding encoding: Encoding, } @@ -802,6 +804,7 @@ mod tests { max_data_page_size: 1024 * 1024, max_dict_page_size: 1024 * 1024, writer_version: WriterVersion::PARQUET_1_0, + enabled_statistics: EnabledStatistics::Page, encoding: Encoding::PLAIN, } } @@ -838,11 +841,19 @@ mod tests { } } + fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self { + Self { + enabled_statistics, + ..self + } + } + fn writer_props(&self) -> WriterProperties { let builder = WriterProperties::builder() .set_data_pagesize_limit(self.max_data_page_size) .set_write_batch_size(self.write_batch_size) - .set_writer_version(self.writer_version); + .set_writer_version(self.writer_version) + .set_statistics_enabled(self.enabled_statistics); let builder = match self.encoding { Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder @@ -896,6 +907,14 @@ mod tests { TestOptions::new(2, 256, 127).with_null_percent(0), // Test optional with nulls TestOptions::new(2, 256, 93).with_null_percent(25), + // Test with no page-level statistics + TestOptions::new(2, 256, 91) + .with_null_percent(25) + .with_enabled_statistics(EnabledStatistics::Chunk), + // Test with no statistics + TestOptions::new(2, 256, 91) + .with_null_percent(25) + .with_enabled_statistics(EnabledStatistics::None), ]; all_options.into_iter().for_each(|opts| { diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index bc31aedf4e55..54003732a06a 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -24,7 +24,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::{ParquetError, Result}; -use crate::file::properties::WriterProperties; +use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; use crate::util::memory::ByteBufferPtr; @@ -105,6 +105,7 @@ pub struct ColumnValueEncoderImpl { dict_encoder: Option>, descr: ColumnDescPtr, num_values: usize, + statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, } @@ -127,11 +128,14 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { .unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)), )?; + let statistics_enabled = props.statistics_enabled(descr.path()); + Ok(Self { encoder, dict_encoder, descr: descr.clone(), num_values: 0, + statistics_enabled, min_value: None, max_value: None, }) @@ -148,9 +152,11 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { ) })?; - if let Some((min, max)) = slice.min_max(&self.descr) { - update_min(&self.descr, min, &mut self.min_value); - update_max(&self.descr, max, &mut self.max_value); + if self.statistics_enabled == EnabledStatistics::Page { + if let Some((min, max)) = slice.min_max(&self.descr) { + update_min(&self.descr, min, &mut self.min_value); + update_max(&self.descr, max, &mut self.max_value); + } } match &mut self.dict_encoder { diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index ff6c098980a2..9a371bc27106 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -456,7 +456,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { for &level in levels { if level == self.descr.max_def_level() { values_to_write += 1; - } else if self.statistics_enabled == EnabledStatistics::Page { + } else { + // We must always compute this as it is used to populate v2 pages self.num_page_nulls += 1 } } @@ -746,15 +747,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { None => data_page_offset + total_compressed_size, }; - let statistics = Statistics::new( - self.min_column_value.clone(), - self.max_column_value.clone(), - self.column_distinct_count, - self.num_column_nulls, - false, - ); - - let metadata = ColumnChunkMetaData::builder(self.descr.clone()) + let mut builder = ColumnChunkMetaData::builder(self.descr.clone()) .set_compression(self.codec) .set_encodings(self.encodings.iter().cloned().collect()) .set_file_offset(file_offset) @@ -762,10 +755,20 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { .set_total_uncompressed_size(total_uncompressed_size) .set_num_values(num_values) .set_data_page_offset(data_page_offset) - .set_dictionary_page_offset(dict_page_offset) - .set_statistics(statistics) - .build()?; + .set_dictionary_page_offset(dict_page_offset); + + if self.statistics_enabled != EnabledStatistics::None { + let statistics = Statistics::new( + self.min_column_value.clone(), + self.max_column_value.clone(), + self.column_distinct_count, + self.num_column_nulls, + false, + ); + builder = builder.set_statistics(statistics); + } + let metadata = builder.build()?; self.page_writer.write_metadata(&metadata)?; Ok(metadata) @@ -1639,6 +1642,56 @@ mod tests { assert!(page_statistics.distinct_count().is_none()); } + #[test] + fn test_disabled_statistics() { + let mut buf = Vec::with_capacity(100); + let mut write = TrackedWrite::new(&mut buf); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::None) + .set_writer_version(WriterVersion::PARQUET_2_0) + .build(); + let props = Arc::new(props); + + let mut writer = get_test_column_writer::(page_writer, 1, 0, props); + writer + .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None) + .unwrap(); + + let (_, _, metadata, _, _) = writer.close().unwrap(); + assert!(metadata.statistics().is_none()); + + let reader = SerializedPageReader::new( + std::io::Cursor::new(buf), + 6, + Compression::UNCOMPRESSED, + Type::INT32, + ) + .unwrap(); + + let pages = reader.collect::>>().unwrap(); + assert_eq!(pages.len(), 2); + + assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); + assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2); + + match &pages[1] { + Page::DataPageV2 { + num_values, + num_nulls, + num_rows, + statistics, + .. + } => { + assert_eq!(*num_values, 6); + assert_eq!(*num_nulls, 2); + assert_eq!(*num_rows, 6); + assert!(statistics.is_none()); + } + _ => unreachable!(), + } + } + #[test] fn test_column_writer_empty_column_roundtrip() { let props = WriterProperties::builder().build();