Skip to content

Commit

Permalink
perf: Remove temporary allocations in Parquet (#18013)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 4, 2024
1 parent a2c8fd1 commit 6259ea4
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,11 @@ impl<'a> BinaryStateTranslation<'a> {
}

pub(crate) fn deserialize_plain(values: &[u8], num_values: usize) -> BinaryDict {
let all = BinaryIter::new(values, num_values).collect::<Vec<_>>();
let values_size = all.iter().map(|v| v.len()).sum::<usize>();
let mut dict_values = MutableBinaryValuesArray::<i64>::with_capacities(all.len(), values_size);
for v in all {
// Each value is prepended by the length which is 4 bytes.
let num_bytes = values.len() - 4 * num_values;

let mut dict_values = MutableBinaryValuesArray::<i64>::with_capacities(num_values, num_bytes);
for v in BinaryIter::new(values, num_values) {
dict_values.push(v)
}

Expand Down
44 changes: 21 additions & 23 deletions crates/polars-parquet/src/parquet/read/page/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::parquet::page::{
CompressedDataPage, CompressedDictPage, CompressedPage, DataPageHeader, PageType,
ParquetPageHeader,
};
use crate::parquet::parquet_bridge::Encoding;
use crate::parquet::CowBuffer;

/// This meta is a small part of [`ColumnChunkMetaData`].
Expand Down Expand Up @@ -230,9 +229,7 @@ fn next_page(reader: &mut PageReader) -> ParquetResult<Option<CompressedPage>> {
pub(super) fn build_page(reader: &mut PageReader) -> ParquetResult<Option<CompressedPage>> {
let page_header = read_page_header(&mut reader.reader, reader.max_page_size)?;

reader.seen_num_values += get_page_header(&page_header)?
.map(|x| x.num_values() as i64)
.unwrap_or_default();
reader.seen_num_values += get_page_num_values(&page_header)? as i64;

let read_size: usize = page_header.compressed_page_size.try_into()?;

Expand Down Expand Up @@ -345,30 +342,31 @@ pub(super) fn finish_page(
}
}

pub(super) fn get_page_header(header: &ParquetPageHeader) -> ParquetResult<Option<DataPageHeader>> {
pub(super) fn get_page_num_values(header: &ParquetPageHeader) -> ParquetResult<i32> {
let type_ = header.type_.try_into()?;
Ok(match type_ {
PageType::DataPage => {
let header = header.data_page_header.clone().ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?;
let _: Encoding = header.encoding.try_into()?;
let _: Encoding = header.repetition_level_encoding.try_into()?;
let _: Encoding = header.definition_level_encoding.try_into()?;

Some(DataPageHeader::V1(header))
header
.data_page_header
.as_ref()
.ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?
.num_values
},
PageType::DataPageV2 => {
let header = header.data_page_header_v2.clone().ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?;
let _: Encoding = header.encoding.try_into()?;
Some(DataPageHeader::V2(header))
header
.data_page_header_v2
.as_ref()
.ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?
.num_values
},
_ => None,
_ => 0,
})
}
33 changes: 31 additions & 2 deletions crates/polars-parquet/src/parquet/read/page/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, Stream};
use parquet_format_safe::thrift::protocol::TCompactInputStreamProtocol;
use polars_utils::mmap::MemSlice;

use super::reader::{finish_page, get_page_header, PageMetaData};
use super::reader::{finish_page, PageMetaData};
use super::PageFilter;
use crate::parquet::compression::Compression;
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::metadata::{ColumnChunkMetaData, Descriptor};
use crate::parquet::page::{CompressedPage, ParquetPageHeader};
use crate::parquet::page::{CompressedPage, DataPageHeader, ParquetPageHeader};
use crate::parquet::parquet_bridge::{Encoding, PageType};

/// Returns a stream of compressed data pages
pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>(
Expand Down Expand Up @@ -137,3 +138,31 @@ async fn read_page_header<R: AsyncRead + Unpin + Send>(
let page_header = ParquetPageHeader::stream_from_in_protocol(&mut prot).await?;
Ok(page_header)
}

pub(super) fn get_page_header(header: &ParquetPageHeader) -> ParquetResult<Option<DataPageHeader>> {
let type_ = header.type_.try_into()?;
Ok(match type_ {
PageType::DataPage => {
let header = header.data_page_header.clone().ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?;
let _: Encoding = header.encoding.try_into()?;
let _: Encoding = header.repetition_level_encoding.try_into()?;
let _: Encoding = header.definition_level_encoding.try_into()?;

Some(DataPageHeader::V1(header))
},
PageType::DataPageV2 => {
let header = header.data_page_header_v2.clone().ok_or_else(|| {
ParquetError::oos(
"The page header type is a v1 data page but the v1 header is empty",
)
})?;
let _: Encoding = header.encoding.try_into()?;
Some(DataPageHeader::V2(header))
},
_ => None,
})
}

0 comments on commit 6259ea4

Please sign in to comment.