diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 541c9810976d..3f14114e3c60 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -77,7 +77,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::SeekFrom; +use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -86,6 +86,7 @@ use std::task::{Context, Poll}; use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::stream::Stream; +use parquet_format::PageType; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; use arrow::datatypes::SchemaRef; @@ -96,11 +97,13 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader; use crate::arrow::schema::parquet_to_arrow_schema; use crate::arrow::ProjectionMask; use crate::basic::Compression; -use crate::column::page::{PageIterator, PageReader}; +use crate::column::page::{Page, PageIterator, PageReader}; +use crate::compression::{create_codec, Codec}; use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; use crate::file::reader::SerializedPageReader; +use crate::file::serialized_reader::{decode_page, read_page_header}; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor}; @@ -433,6 +436,7 @@ where } } +/// An in-memory collection of column chunks struct InMemoryRowGroup { schema: SchemaDescPtr, column_chunks: Vec>, @@ -459,6 +463,7 @@ impl RowGroupCollection for InMemoryRowGroup { } } +/// Data for a single column chunk #[derive(Clone)] struct InMemoryColumnChunk { num_values: i64, @@ -480,6 +485,82 @@ impl InMemoryColumnChunk { } } +// A serialized implementation for Parquet [`PageReader`]. +struct InMemoryColumnChunkReader { + chunk: InMemoryColumnChunk, + decompressor: Option>, + offset: usize, + seen_num_values: i64, +} + +impl InMemoryColumnChunkReader { + /// Creates a new serialized page reader from file source. + pub fn new(chunk: InMemoryColumnChunk) -> Result { + let decompressor = create_codec(chunk.compression)?; + let result = Self { + chunk, + decompressor, + offset: 0, + seen_num_values: 0, + }; + Ok(result) + } +} + +impl Iterator for InMemoryColumnChunkReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.get_next_page().transpose() + } +} + +impl PageReader for InMemoryColumnChunkReader { + fn get_next_page(&mut self) -> Result> { + while self.seen_num_values < self.chunk.num_values { + let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]); + let page_header = read_page_header(&mut cursor)?; + let compressed_size = page_header.compressed_page_size as usize; + + self.offset += cursor.position() as usize; + let start_offset = self.offset; + let end_offset = self.offset + compressed_size; + self.offset = end_offset; + + let buffer = self.chunk.data.slice(start_offset..end_offset); + + let result = match page_header.type_ { + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer.into(), + self.chunk.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded + } + PageType::DictionaryPage => decode_page( + page_header, + buffer.into(), + self.chunk.physical_type, + self.decompressor.as_mut(), + )?, + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + } + }; + + return Ok(Some(result)); + } + + // We are at the end of this column chunk and no more page left. Return None. + Ok(None) + } +} + +/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`] struct ColumnChunkIterator { schema: SchemaDescPtr, column_schema: ColumnDescPtr, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1194292354cf..6ff73e041e88 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -358,6 +358,108 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' } } +/// Reads a [`PageHeader`] from the provided [`Read`] +pub(crate) fn read_page_header(input: &mut T) -> Result { + let mut prot = TCompactInputProtocol::new(input); + let page_header = PageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) +} + +/// Decodes a [`Page`] from the provided `buffer` +pub(crate) fn decode_page( + page_header: PageHeader, + buffer: ByteBufferPtr, + physical_type: Type, + decompressor: Option<&mut Box>, +) -> Result { + // When processing data page v2, depending on enabled compression for the + // page, we should account for uncompressed data ('offset') of + // repetition and definition levels. + // + // We always use 0 offset for other pages other than v2, `true` flag means + // that compression will be applied if decompressor is defined + let mut offset: usize = 0; + let mut can_decompress = true; + + if let Some(ref header_v2) = page_header.data_page_header_v2 { + offset = (header_v2.definition_levels_byte_length + + header_v2.repetition_levels_byte_length) as usize; + // When is_compressed flag is missing the page is considered compressed + can_decompress = header_v2.is_compressed.unwrap_or(true); + } + + // TODO: page header could be huge because of statistics. We should set a + // maximum page header size and abort if that is exceeded. + let buffer = match decompressor { + Some(decompressor) if can_decompress => { + let uncompressed_size = page_header.uncompressed_page_size as usize; + let mut decompressed = Vec::with_capacity(uncompressed_size); + let compressed = &buffer.as_ref()[offset..]; + decompressed.extend_from_slice(&buffer.as_ref()[..offset]); + decompressor.decompress(compressed, &mut decompressed)?; + + if decompressed.len() != uncompressed_size { + return Err(general_err!( + "Actual decompressed size doesn't match the expected one ({} vs {})", + decompressed.len(), + uncompressed_size + )); + } + + ByteBufferPtr::new(decompressed) + } + _ => buffer, + }; + + let result = match page_header.type_ { + PageType::DictionaryPage => { + assert!(page_header.dictionary_page_header.is_some()); + let dict_header = page_header.dictionary_page_header.as_ref().unwrap(); + let is_sorted = dict_header.is_sorted.unwrap_or(false); + Page::DictionaryPage { + buf: buffer, + num_values: dict_header.num_values as u32, + encoding: Encoding::from(dict_header.encoding), + is_sorted, + } + } + PageType::DataPage => { + assert!(page_header.data_page_header.is_some()); + let header = page_header.data_page_header.unwrap(); + Page::DataPage { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + def_level_encoding: Encoding::from(header.definition_level_encoding), + rep_level_encoding: Encoding::from(header.repetition_level_encoding), + statistics: statistics::from_thrift(physical_type, header.statistics), + } + } + PageType::DataPageV2 => { + assert!(page_header.data_page_header_v2.is_some()); + let header = page_header.data_page_header_v2.unwrap(); + let is_compressed = header.is_compressed.unwrap_or(true); + Page::DataPageV2 { + buf: buffer, + num_values: header.num_values as u32, + encoding: Encoding::from(header.encoding), + num_nulls: header.num_nulls as u32, + num_rows: header.num_rows as u32, + def_levels_byte_len: header.definition_levels_byte_length as u32, + rep_levels_byte_len: header.repetition_levels_byte_length as u32, + is_compressed, + statistics: statistics::from_thrift(physical_type, header.statistics), + } + } + _ => { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + unimplemented!("Page type {:?} is not supported", page_header.type_) + } + }; + + Ok(result) +} + /// A serialized implementation for Parquet [`PageReader`]. pub struct SerializedPageReader { // The file source buffer which references exactly the bytes for the column trunk @@ -395,13 +497,6 @@ impl SerializedPageReader { }; Ok(result) } - - /// Reads Page header from Thrift. - fn read_page_header(&mut self) -> Result { - let mut prot = TCompactInputProtocol::new(&mut self.buf); - let page_header = PageHeader::read_from_in_protocol(&mut prot)?; - Ok(page_header) - } } impl Iterator for SerializedPageReader { @@ -415,108 +510,40 @@ impl Iterator for SerializedPageReader { impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { - let page_header = self.read_page_header()?; - - // When processing data page v2, depending on enabled compression for the - // page, we should account for uncompressed data ('offset') of - // repetition and definition levels. - // - // We always use 0 offset for other pages other than v2, `true` flag means - // that compression will be applied if decompressor is defined - let mut offset: usize = 0; - let mut can_decompress = true; - - if let Some(ref header_v2) = page_header.data_page_header_v2 { - offset = (header_v2.definition_levels_byte_length - + header_v2.repetition_levels_byte_length) - as usize; - // When is_compressed flag is missing the page is considered compressed - can_decompress = header_v2.is_compressed.unwrap_or(true); - } - - let compressed_len = page_header.compressed_page_size as usize - offset; - let uncompressed_len = page_header.uncompressed_page_size as usize - offset; - // We still need to read all bytes from buffered stream - let mut buffer = vec![0; offset + compressed_len]; - self.buf.read_exact(&mut buffer)?; - - // TODO: page header could be huge because of statistics. We should set a - // maximum page header size and abort if that is exceeded. - if let Some(decompressor) = self.decompressor.as_mut() { - if can_decompress { - let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); - let decompressed_size = decompressor - .decompress(&buffer[offset..], &mut decompressed_buffer)?; - if decompressed_size != uncompressed_len { - return Err(general_err!( - "Actual decompressed size doesn't match the expected one ({} vs {})", - decompressed_size, - uncompressed_len - )); - } - if offset == 0 { - buffer = decompressed_buffer; - } else { - // Prepend saved offsets to the buffer - buffer.truncate(offset); - buffer.append(&mut decompressed_buffer); - } - } + let page_header = read_page_header(&mut self.buf)?; + + let to_read = page_header.compressed_page_size as usize; + let mut buffer = Vec::with_capacity(to_read); + let read = (&mut self.buf) + .take(to_read as u64) + .read_to_end(&mut buffer)?; + + if read != to_read { + return Err(eof_err!( + "Expected to read {} bytes of page, read only {}", + to_read, + read + )); } + let buffer = ByteBufferPtr::new(buffer); let result = match page_header.type_ { - PageType::DictionaryPage => { - assert!(page_header.dictionary_page_header.is_some()); - let dict_header = - page_header.dictionary_page_header.as_ref().unwrap(); - let is_sorted = dict_header.is_sorted.unwrap_or(false); - Page::DictionaryPage { - buf: ByteBufferPtr::new(buffer), - num_values: dict_header.num_values as u32, - encoding: Encoding::from(dict_header.encoding), - is_sorted, - } - } - PageType::DataPage => { - assert!(page_header.data_page_header.is_some()); - let header = page_header.data_page_header.unwrap(); - self.seen_num_values += header.num_values as i64; - Page::DataPage { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - def_level_encoding: Encoding::from( - header.definition_level_encoding, - ), - rep_level_encoding: Encoding::from( - header.repetition_level_encoding, - ), - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } - } - PageType::DataPageV2 => { - assert!(page_header.data_page_header_v2.is_some()); - let header = page_header.data_page_header_v2.unwrap(); - let is_compressed = header.is_compressed.unwrap_or(true); - self.seen_num_values += header.num_values as i64; - Page::DataPageV2 { - buf: ByteBufferPtr::new(buffer), - num_values: header.num_values as u32, - encoding: Encoding::from(header.encoding), - num_nulls: header.num_nulls as u32, - num_rows: header.num_rows as u32, - def_levels_byte_len: header.definition_levels_byte_length as u32, - rep_levels_byte_len: header.repetition_levels_byte_length as u32, - is_compressed, - statistics: statistics::from_thrift( - self.physical_type, - header.statistics, - ), - } + PageType::DataPage | PageType::DataPageV2 => { + let decoded = decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?; + self.seen_num_values += decoded.num_values() as i64; + decoded } + PageType::DictionaryPage => decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )?, _ => { // For unknown page type (e.g., INDEX_PAGE), skip and read next. continue;