Skip to content

Commit

Permalink
Merge dfedade into 1d31d30
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Jun 8, 2022
2 parents 1d31d30 + dfedade commit 17412bf
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 107 deletions.
85 changes: 83 additions & 2 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
use std::io::SeekFrom;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -86,6 +86,7 @@ use std::task::{Context, Poll};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use parquet_format::PageType;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow::datatypes::SchemaRef;
Expand All @@ -96,11 +97,13 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{PageIterator, PageReader};
use crate::column::page::{Page, PageIterator, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::reader::SerializedPageReader;
use crate::file::serialized_reader::{decode_page, read_page_header};
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};

Expand Down Expand Up @@ -433,6 +436,7 @@ where
}
}

/// An in-memory collection of column chunks
struct InMemoryRowGroup {
schema: SchemaDescPtr,
column_chunks: Vec<Option<InMemoryColumnChunk>>,
Expand All @@ -459,6 +463,7 @@ impl RowGroupCollection for InMemoryRowGroup {
}
}

/// Data for a single column chunk
#[derive(Clone)]
struct InMemoryColumnChunk {
num_values: i64,
Expand All @@ -480,6 +485,82 @@ impl InMemoryColumnChunk {
}
}

// A serialized implementation for Parquet [`PageReader`].
struct InMemoryColumnChunkReader {
chunk: InMemoryColumnChunk,
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
}

impl InMemoryColumnChunkReader {
/// Creates a new serialized page reader from file source.
pub fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
let decompressor = create_codec(chunk.compression)?;
let result = Self {
chunk,
decompressor,
offset: 0,
seen_num_values: 0,
};
Ok(result)
}
}

impl Iterator for InMemoryColumnChunkReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}

impl PageReader for InMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.chunk.num_values {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
let compressed_size = page_header.compressed_page_size as usize;

self.offset += cursor.position() as usize;
let start_offset = self.offset;
let end_offset = self.offset + compressed_size;
self.offset = end_offset;

let buffer = self.chunk.data.slice(start_offset..end_offset);

let result = match page_header.type_ {
PageType::DataPage | PageType::DataPageV2 => {
let decoded = decode_page(
page_header,
buffer.into(),
self.chunk.physical_type,
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
decoded
}
PageType::DictionaryPage => decode_page(
page_header,
buffer.into(),
self.chunk.physical_type,
self.decompressor.as_mut(),
)?,
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
}
};

return Ok(Some(result));
}

// We are at the end of this column chunk and no more page left. Return None.
Ok(None)
}
}

/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
struct ColumnChunkIterator {
schema: SchemaDescPtr,
column_schema: ColumnDescPtr,
Expand Down
237 changes: 132 additions & 105 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,108 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
}
}

/// Reads a [`PageHeader`] from the provided [`Read`]
pub(crate) fn read_page_header<T: Read>(input: &mut T) -> Result<PageHeader> {
let mut prot = TCompactInputProtocol::new(input);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok(page_header)
}

/// Decodes a [`Page`] from the provided `buffer`
pub(crate) fn decode_page(
page_header: PageHeader,
buffer: ByteBufferPtr,
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
//
// We always use 0 offset for other pages other than v2, `true` flag means
// that compression will be applied if decompressor is defined
let mut offset: usize = 0;
let mut can_decompress = true;

if let Some(ref header_v2) = page_header.data_page_header_v2 {
offset = (header_v2.definition_levels_byte_length
+ header_v2.repetition_levels_byte_length) as usize;
// When is_compressed flag is missing the page is considered compressed
can_decompress = header_v2.is_compressed.unwrap_or(true);
}

// TODO: page header could be huge because of statistics. We should set a
// maximum page header size and abort if that is exceeded.
let buffer = match decompressor {
Some(decompressor) if can_decompress => {
let uncompressed_size = page_header.uncompressed_page_size as usize;
let mut decompressed = Vec::with_capacity(uncompressed_size);
let compressed = &buffer.as_ref()[offset..];
decompressed.extend_from_slice(&buffer.as_ref()[..offset]);
decompressor.decompress(compressed, &mut decompressed)?;

if decompressed.len() != uncompressed_size {
return Err(general_err!(
"Actual decompressed size doesn't match the expected one ({} vs {})",
decompressed.len(),
uncompressed_size
));
}

ByteBufferPtr::new(decompressed)
}
_ => buffer,
};

let result = match page_header.type_ {
PageType::DictionaryPage => {
assert!(page_header.dictionary_page_header.is_some());
let dict_header = page_header.dictionary_page_header.as_ref().unwrap();
let is_sorted = dict_header.is_sorted.unwrap_or(false);
Page::DictionaryPage {
buf: buffer,
num_values: dict_header.num_values as u32,
encoding: Encoding::from(dict_header.encoding),
is_sorted,
}
}
PageType::DataPage => {
assert!(page_header.data_page_header.is_some());
let header = page_header.data_page_header.unwrap();
Page::DataPage {
buf: buffer,
num_values: header.num_values as u32,
encoding: Encoding::from(header.encoding),
def_level_encoding: Encoding::from(header.definition_level_encoding),
rep_level_encoding: Encoding::from(header.repetition_level_encoding),
statistics: statistics::from_thrift(physical_type, header.statistics),
}
}
PageType::DataPageV2 => {
assert!(page_header.data_page_header_v2.is_some());
let header = page_header.data_page_header_v2.unwrap();
let is_compressed = header.is_compressed.unwrap_or(true);
Page::DataPageV2 {
buf: buffer,
num_values: header.num_values as u32,
encoding: Encoding::from(header.encoding),
num_nulls: header.num_nulls as u32,
num_rows: header.num_rows as u32,
def_levels_byte_len: header.definition_levels_byte_length as u32,
rep_levels_byte_len: header.repetition_levels_byte_length as u32,
is_compressed,
statistics: statistics::from_thrift(physical_type, header.statistics),
}
}
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
unimplemented!("Page type {:?} is not supported", page_header.type_)
}
};

Ok(result)
}

/// A serialized implementation for Parquet [`PageReader`].
pub struct SerializedPageReader<T: Read> {
// The file source buffer which references exactly the bytes for the column trunk
Expand Down Expand Up @@ -395,13 +497,6 @@ impl<T: Read> SerializedPageReader<T> {
};
Ok(result)
}

/// Reads Page header from Thrift.
fn read_page_header(&mut self) -> Result<PageHeader> {
let mut prot = TCompactInputProtocol::new(&mut self.buf);
let page_header = PageHeader::read_from_in_protocol(&mut prot)?;
Ok(page_header)
}
}

impl<T: Read + Send> Iterator for SerializedPageReader<T> {
Expand All @@ -415,108 +510,40 @@ impl<T: Read + Send> Iterator for SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
let page_header = self.read_page_header()?;

// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
//
// We always use 0 offset for other pages other than v2, `true` flag means
// that compression will be applied if decompressor is defined
let mut offset: usize = 0;
let mut can_decompress = true;

if let Some(ref header_v2) = page_header.data_page_header_v2 {
offset = (header_v2.definition_levels_byte_length
+ header_v2.repetition_levels_byte_length)
as usize;
// When is_compressed flag is missing the page is considered compressed
can_decompress = header_v2.is_compressed.unwrap_or(true);
}

let compressed_len = page_header.compressed_page_size as usize - offset;
let uncompressed_len = page_header.uncompressed_page_size as usize - offset;
// We still need to read all bytes from buffered stream
let mut buffer = vec![0; offset + compressed_len];
self.buf.read_exact(&mut buffer)?;

// TODO: page header could be huge because of statistics. We should set a
// maximum page header size and abort if that is exceeded.
if let Some(decompressor) = self.decompressor.as_mut() {
if can_decompress {
let mut decompressed_buffer = Vec::with_capacity(uncompressed_len);
let decompressed_size = decompressor
.decompress(&buffer[offset..], &mut decompressed_buffer)?;
if decompressed_size != uncompressed_len {
return Err(general_err!(
"Actual decompressed size doesn't match the expected one ({} vs {})",
decompressed_size,
uncompressed_len
));
}
if offset == 0 {
buffer = decompressed_buffer;
} else {
// Prepend saved offsets to the buffer
buffer.truncate(offset);
buffer.append(&mut decompressed_buffer);
}
}
let page_header = read_page_header(&mut self.buf)?;

let to_read = page_header.compressed_page_size as usize;
let mut buffer = Vec::with_capacity(to_read);
let read = (&mut self.buf)
.take(to_read as u64)
.read_to_end(&mut buffer)?;

if read != to_read {
return Err(eof_err!(
"Expected to read {} bytes of page, read only {}",
to_read,
read
));
}

let buffer = ByteBufferPtr::new(buffer);
let result = match page_header.type_ {
PageType::DictionaryPage => {
assert!(page_header.dictionary_page_header.is_some());
let dict_header =
page_header.dictionary_page_header.as_ref().unwrap();
let is_sorted = dict_header.is_sorted.unwrap_or(false);
Page::DictionaryPage {
buf: ByteBufferPtr::new(buffer),
num_values: dict_header.num_values as u32,
encoding: Encoding::from(dict_header.encoding),
is_sorted,
}
}
PageType::DataPage => {
assert!(page_header.data_page_header.is_some());
let header = page_header.data_page_header.unwrap();
self.seen_num_values += header.num_values as i64;
Page::DataPage {
buf: ByteBufferPtr::new(buffer),
num_values: header.num_values as u32,
encoding: Encoding::from(header.encoding),
def_level_encoding: Encoding::from(
header.definition_level_encoding,
),
rep_level_encoding: Encoding::from(
header.repetition_level_encoding,
),
statistics: statistics::from_thrift(
self.physical_type,
header.statistics,
),
}
}
PageType::DataPageV2 => {
assert!(page_header.data_page_header_v2.is_some());
let header = page_header.data_page_header_v2.unwrap();
let is_compressed = header.is_compressed.unwrap_or(true);
self.seen_num_values += header.num_values as i64;
Page::DataPageV2 {
buf: ByteBufferPtr::new(buffer),
num_values: header.num_values as u32,
encoding: Encoding::from(header.encoding),
num_nulls: header.num_nulls as u32,
num_rows: header.num_rows as u32,
def_levels_byte_len: header.definition_levels_byte_length as u32,
rep_levels_byte_len: header.repetition_levels_byte_length as u32,
is_compressed,
statistics: statistics::from_thrift(
self.physical_type,
header.statistics,
),
}
PageType::DataPage | PageType::DataPageV2 => {
let decoded = decode_page(
page_header,
buffer,
self.physical_type,
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
decoded
}
PageType::DictionaryPage => decode_page(
page_header,
buffer,
self.physical_type,
self.decompressor.as_mut(),
)?,
_ => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
Expand Down

0 comments on commit 17412bf

Please sign in to comment.