Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support peek_next_page() and skip_next_page in serialized_reader. #2044

Merged
merged 6 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ impl RowGroupMetaData {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}

/// Returns reference of page offset index.
/// Returns reference of page offset index of all column in this row group.
pub fn page_offset_index(&self) -> &Option<Vec<Vec<PageLocation>>> {
&self.page_offset_index
}
Expand Down
119 changes: 84 additions & 35 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)

use bytes::{Buf, Bytes};
use std::{convert::TryFrom, fs::File, io, io::Read, path::Path, sync::Arc};
use std::borrow::BorrowMut;
use std::collections::VecDeque;
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};

use parquet_format::{PageHeader, PageLocation, PageType};
use thrift::protocol::TCompactInputProtocol;
Expand All @@ -37,7 +39,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
use crate::util::page_util::calculate_row_count;
use crate::util::page_util::{calculate_row_count, get_pages_readable_slices};
#[allow(deprecated)]
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

Expand Down Expand Up @@ -357,12 +359,24 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
self.metadata.num_rows(),
)?;
if let Some(offset_index) = self.metadata.page_offset_index() {
let first_data_page_offset = offset_index[i][0].offset;
let has_dictionary_page = first_data_page_offset != col.file_offset();
page_reader.with_page_offset_index_and_has_dictionary_to_read(
offset_index[i].clone(),
has_dictionary_page,
);
let col_chunk_offset_index = &offset_index[i];
let (page_bufs, has_dict) = get_pages_readable_slices(
col_chunk_offset_index,
col_start,
self.chunk_reader.clone(),
)?;
let file_chunk =
self.chunk_reader.get_read(col_start, col_length as usize)?;
page_reader = SerializedPageReader::new_with_page_offsets(
file_chunk,
col.num_values(),
col.compression(),
col.column_descr().physical_type(),
self.metadata.num_rows(),
col_chunk_offset_index.clone(),
has_dict,
page_bufs,
)?;
}
Ok(Box::new(page_reader))
}
Expand Down Expand Up @@ -504,6 +518,9 @@ pub struct SerializedPageReader<T: Read> {

// A flag to check whether a dictionary page should read first
has_dictionary_page_to_read: bool,

// A list of readable slice in 'SerializedPageReader' for skipping page with offset index.
page_bufs: VecDeque<T>,
}

impl<T: Read> SerializedPageReader<T> {
Expand All @@ -526,6 +543,34 @@ impl<T: Read> SerializedPageReader<T> {
page_offset_index: None,
seen_num_data_pages: 0,
has_dictionary_page_to_read: false,
page_bufs: Default::default(),
};
Ok(result)
}

/// Creates a new serialized page reader from file source.
pub fn new_with_page_offsets(
buf: T,
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
total_num_values: i64,
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
compression: Compression,
physical_type: Type,
num_rows: i64,
offset_index: Vec<PageLocation>,
has_dictionary_page_to_read: bool,
page_bufs: VecDeque<T>,
) -> Result<Self> {
let decompressor = create_codec(compression)?;
let result = Self {
buf,
total_num_values,
seen_num_values: 0,
decompressor,
physical_type,
num_rows,
page_offset_index: Some(offset_index),
seen_num_data_pages: 0,
has_dictionary_page_to_read,
page_bufs,
};
Ok(result)
}
Expand All @@ -550,22 +595,29 @@ impl<T: Read + Send> Iterator for SerializedPageReader<T> {

impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
let mut cursor = &mut self.buf;
let mut dictionary_cursor;
while self.seen_num_values < self.total_num_values {
// For now we can not update `seen_num_values` in `skip_next_page`,
// so we need add this check.
if let Some(indexes) = &self.page_offset_index {
if indexes.len() == self.seen_num_data_pages {
// For now we can not update `seen_num_values` in `skip_next_page`,
// so we need add this check.
if indexes.len() <= self.seen_num_data_pages {
return Ok(None);
} else if self.seen_num_data_pages == 0
&& self.has_dictionary_page_to_read
{
dictionary_cursor = self.page_bufs.pop_front().unwrap();
cursor = dictionary_cursor.borrow_mut();
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
} else {
cursor = self.page_bufs.get_mut(self.seen_num_data_pages).unwrap();
}
}

let page_header = read_page_header(&mut self.buf)?;
let page_header = read_page_header(cursor)?;

let to_read = page_header.compressed_page_size as usize;
let mut buffer = Vec::with_capacity(to_read);
let read = (&mut self.buf)
.take(to_read as u64)
.read_to_end(&mut buffer)?;
let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?;

if read != to_read {
return Err(eof_err!(
Expand All @@ -588,12 +640,15 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
self.seen_num_data_pages += 1;
decoded
}
PageType::DictionaryPage => decode_page(
page_header,
buffer,
self.physical_type,
self.decompressor.as_mut(),
)?,
PageType::DictionaryPage => {
self.has_dictionary_page_to_read = false;
decode_page(
page_header,
buffer,
self.physical_type,
self.decompressor.as_mut(),
)?
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
Expand All @@ -611,7 +666,8 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
if self.seen_num_data_pages == page_offset_index.len() {
Ok(None)
} else if self.seen_num_data_pages == 0 && self.has_dictionary_page_to_read {
self.has_dictionary_page_to_read = false;
// Will set `has_dictionary_page_to_read` false in `get_next_page`,
// assume dictionary page must be read and cannot be skipped.
Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
Expand All @@ -634,21 +690,14 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {

fn skip_next_page(&mut self) -> Result<()> {
if let Some(page_offset_index) = &self.page_offset_index {
let location = &page_offset_index[self.seen_num_data_pages];
let compressed_page_size = location.compressed_page_size;
//skip page bytes
let skip_size = io::copy(
self.buf.by_ref().take(compressed_page_size as u64).by_ref(),
&mut io::sink(),
)?;
if skip_size == compressed_page_size as u64 {
self.seen_num_data_pages += 1;
// Notice: 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Ok(())
} else {
if page_offset_index.len() <= self.seen_num_data_pages {
Err(general_err!(
"skip_next_page size is not equal compressed_page_size"
"seen_num_data_pages is out of bound in SerializedPageReader."
))
} else {
self.seen_num_data_pages += 1;
// Notice: maybe need 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Ok(())
}
} else {
Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader."))
Expand Down
27 changes: 27 additions & 0 deletions parquet/src/util/page_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,40 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::VecDeque;
use std::io::Read;
use std::sync::Arc;
use crate::errors::Result;
use parquet_format::PageLocation;
use crate::file::reader::ChunkReader;

/// Use column chunk's offset index to get the `page_num` page row count.
pub(crate) fn calculate_row_count(indexes: &[PageLocation], page_num: usize, total_row_count: i64) -> Result<usize> {
if page_num == indexes.len() - 1 {
Ok((total_row_count - indexes[page_num].first_row_index + 1) as usize)
} else {
Ok((indexes[page_num + 1].first_row_index - indexes[page_num].first_row_index) as usize)
}
}

/// Use column chunk's offset index to get each page serially readable slice
/// and a flag indicates whether having one dictionary page in this column chunk.
pub(crate) fn get_pages_readable_slices<T: Read + Send, R: ChunkReader + ChunkReader<T=T>>(col_chunk_offset_index: &[PageLocation], col_start: u64, chunk_reader: Arc<R>) -> Result<(VecDeque<T>, bool)> {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
let first_data_page_offset = col_chunk_offset_index[0].offset as u64;
let has_dictionary_page = first_data_page_offset != col_start;
let mut page_readers = VecDeque::with_capacity(col_chunk_offset_index.len() + 1);

if has_dictionary_page {
let length = (first_data_page_offset - col_start) as usize;
let reader: T = chunk_reader.get_read(col_start, length)?;
page_readers.push_back(reader);
}

for index in col_chunk_offset_index {
let start = index.offset as u64;
let length = index.compressed_page_size as usize;
let reader: T = chunk_reader.get_read(start, length)?;
page_readers.push_back(reader)
}
Ok((page_readers, has_dictionary_page))
}