From 6259ea41df673a1d1cf23fee03caafd4e0f615f8 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Sun, 4 Aug 2024 10:05:13 +0200 Subject: [PATCH] perf: Remove temporary allocations in Parquet (#18013) --- .../arrow/read/deserialize/binary/decoders.rs | 9 ++-- .../src/parquet/read/page/reader.rs | 44 +++++++++---------- .../src/parquet/read/page/stream.rs | 33 +++++++++++++- 3 files changed, 57 insertions(+), 29 deletions(-) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index bb1e77b870dd..646eac58d2ec 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -201,10 +201,11 @@ impl<'a> BinaryStateTranslation<'a> { } pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict { - let all = BinaryIter::new(values, num_values).collect::>(); - let values_size = all.iter().map(|v| v.len()).sum::(); - let mut dict_values = MutableBinaryValuesArray::::with_capacities(all.len(), values_size); - for v in all { + // Each value is prepended by the length which is 4 bytes. + let num_bytes = values.len() - 4 * num_values; + + let mut dict_values = MutableBinaryValuesArray::::with_capacities(num_values, num_bytes); + for v in BinaryIter::new(values, num_values) { dict_values.push(v) } diff --git a/crates/polars-parquet/src/parquet/read/page/reader.rs b/crates/polars-parquet/src/parquet/read/page/reader.rs index 80b51603f6fd..cf01a25d7e07 100644 --- a/crates/polars-parquet/src/parquet/read/page/reader.rs +++ b/crates/polars-parquet/src/parquet/read/page/reader.rs @@ -13,7 +13,6 @@ use crate::parquet::page::{ CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, PageType, ParquetPageHeader, }; -use crate::parquet::parquet_bridge::Encoding; use crate::parquet::CowBuffer; /// This meta is a small part of [`ColumnChunkMetaData`]. @@ -230,9 +229,7 @@ fn next_page(reader: &mut PageReader) -> ParquetResult> { pub(super) fn build_page(reader: &mut PageReader) -> ParquetResult> { let page_header = read_page_header(&mut reader.reader, reader.max_page_size)?; - reader.seen_num_values += get_page_header(&page_header)? - .map(|x| x.num_values() as i64) - .unwrap_or_default(); + reader.seen_num_values += get_page_num_values(&page_header)? as i64; let read_size: usize = page_header.compressed_page_size.try_into()?; @@ -345,30 +342,31 @@ pub(super) fn finish_page( } } -pub(super) fn get_page_header(header: &ParquetPageHeader) -> ParquetResult> { +pub(super) fn get_page_num_values(header: &ParquetPageHeader) -> ParquetResult { let type_ = header.type_.try_into()?; Ok(match type_ { PageType::DataPage => { - let header = header.data_page_header.clone().ok_or_else(|| { - ParquetError::oos( - "The page header type is a v1 data page but the v1 header is empty", - ) - })?; - let _: Encoding = header.encoding.try_into()?; - let _: Encoding = header.repetition_level_encoding.try_into()?; - let _: Encoding = header.definition_level_encoding.try_into()?; - - Some(DataPageHeader::V1(header)) + header + .data_page_header + .as_ref() + .ok_or_else(|| { + ParquetError::oos( + "The page header type is a v1 data page but the v1 header is empty", + ) + })? + .num_values }, PageType::DataPageV2 => { - let header = header.data_page_header_v2.clone().ok_or_else(|| { - ParquetError::oos( - "The page header type is a v1 data page but the v1 header is empty", - ) - })?; - let _: Encoding = header.encoding.try_into()?; - Some(DataPageHeader::V2(header)) + header + .data_page_header_v2 + .as_ref() + .ok_or_else(|| { + ParquetError::oos( + "The page header type is a v1 data page but the v1 header is empty", + ) + })? + .num_values }, - _ => None, + _ => 0, }) } diff --git a/crates/polars-parquet/src/parquet/read/page/stream.rs b/crates/polars-parquet/src/parquet/read/page/stream.rs index 86939e20be07..7b89dc3937cd 100644 --- a/crates/polars-parquet/src/parquet/read/page/stream.rs +++ b/crates/polars-parquet/src/parquet/read/page/stream.rs @@ -6,12 +6,13 @@ use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream}; use parquet_format_safe::thrift::protocol::TCompactInputStreamProtocol; use polars_utils::mmap::MemSlice; -use super::reader::{finish_page, get_page_header, PageMetaData}; +use super::reader::{finish_page, PageMetaData}; use super::PageFilter; use crate::parquet::compression::Compression; use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor}; -use crate::parquet::page::{CompressedPage, ParquetPageHeader}; +use crate::parquet::page::{CompressedPage, DataPageHeader, ParquetPageHeader}; +use crate::parquet::parquet_bridge::{Encoding, PageType}; /// Returns a stream of compressed data pages pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( @@ -137,3 +138,31 @@ async fn read_page_header( let page_header = ParquetPageHeader::stream_from_in_protocol(&mut prot).await?; Ok(page_header) } + +pub(super) fn get_page_header(header: &ParquetPageHeader) -> ParquetResult> { + let type_ = header.type_.try_into()?; + Ok(match type_ { + PageType::DataPage => { + let header = header.data_page_header.clone().ok_or_else(|| { + ParquetError::oos( + "The page header type is a v1 data page but the v1 header is empty", + ) + })?; + let _: Encoding = header.encoding.try_into()?; + let _: Encoding = header.repetition_level_encoding.try_into()?; + let _: Encoding = header.definition_level_encoding.try_into()?; + + Some(DataPageHeader::V1(header)) + }, + PageType::DataPageV2 => { + let header = header.data_page_header_v2.clone().ok_or_else(|| { + ParquetError::oos( + "The page header type is a v1 data page but the v1 header is empty", + ) + })?; + let _: Encoding = header.encoding.try_into()?; + Some(DataPageHeader::V2(header)) + }, + _ => None, + }) +}