Skip to content

Commit

Permalink
Implement dictionary support for reading ByteView from parquet (#5973)
Browse files Browse the repository at this point in the history
* implement dictionary encoding support

* update comments
  • Loading branch information
XiangpengHao authored Jul 3, 2024
1 parent 035b589 commit e7a0008
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 9 deletions.
121 changes: 112 additions & 9 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::view_buffer::ViewBuffer;
use crate::arrow::decoder::DictIndexDecoder;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
Expand All @@ -25,6 +26,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow_array::ArrayRef;
use arrow_data::ByteView;
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;
use std::any::Any;
Expand Down Expand Up @@ -210,6 +212,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
pub enum ByteViewArrayDecoder {
Plain(ByteViewArrayDecoderPlain),
Dictionary(ByteViewArrayDecoderDictionary),
}

impl ByteViewArrayDecoder {
Expand All @@ -227,10 +230,14 @@ impl ByteViewArrayDecoder {
num_values,
validate_utf8,
)),
Encoding::RLE_DICTIONARY
| Encoding::PLAIN_DICTIONARY
| Encoding::DELTA_LENGTH_BYTE_ARRAY
| Encoding::DELTA_BYTE_ARRAY => unimplemented!("stay tuned!"),
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
data, num_levels, num_values,
))
}
Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
unimplemented!("stay tuned!")
}
_ => {
return Err(general_err!(
"unsupported encoding for byte array: {}",
Expand All @@ -247,17 +254,27 @@ impl ByteViewArrayDecoder {
&mut self,
out: &mut ViewBuffer,
len: usize,
_dict: Option<&ViewBuffer>,
dict: Option<&ViewBuffer>,
) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.read(out, len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.read(out, dict, len)
}
}
}

/// Skip `len` values
pub fn skip(&mut self, len: usize, _dict: Option<&ViewBuffer>) -> Result<usize> {
pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
match self {
ByteViewArrayDecoder::Plain(d) => d.skip(len),
ByteViewArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
d.skip(dict, len)
}
}
}
}
Expand Down Expand Up @@ -348,6 +365,90 @@ impl ByteViewArrayDecoderPlain {
}
}

pub struct ByteViewArrayDecoderDictionary {
decoder: DictIndexDecoder,
}

impl ByteViewArrayDecoderDictionary {
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
Self {
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}

/// Reads the next indexes from self.decoder
/// the indexes are assumed to be indexes into `dict`
/// the output values are written to output
///
/// Assumptions / Optimization
/// This function checks if dict.buffers() are the last buffers in `output`, and if so
/// reuses the dictionary page buffers directly without copying data
fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
if dict.is_empty() || len == 0 {
return Ok(0);
}

// Check if the last few buffer of `output`` are the same as the `dict` buffer
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
let need_to_create_new_buffer = {
if output.buffers.len() >= dict.buffers.len() {
let offset = output.buffers.len() - dict.buffers.len();
output.buffers[offset..]
.iter()
.zip(dict.buffers.iter())
.any(|(a, b)| !a.ptr_eq(b))
} else {
true
}
};

if need_to_create_new_buffer {
for b in dict.buffers.iter() {
output.buffers.push(b.clone());
}
}

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;

self.decoder.read(len, |keys| {
for k in keys {
let view = dict
.views
.get(*k as usize)
.ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
let len = *view as u32;
if len <= 12 {
// directly append the view if it is inlined
// Safety: the view is from the dictionary, so it is valid
unsafe {
output.append_raw_view_unchecked(view);
}
} else {
// correct the buffer index and append the view
let mut view = ByteView::from(*view);
view.buffer_index += base_buffer_idx;
// Safety: the view is from the dictionary,
// we corrected the index value to point it to output buffer, so it is valid
unsafe {
output.append_raw_view_unchecked(&view.into());
}
}
}
Ok(())
})
}

fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
if dict.is_empty() {
return Ok(0);
}
self.decoder.skip(to_skip)
}
}

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
Expand Down Expand Up @@ -386,8 +487,11 @@ mod tests {
.unwrap();

for (encoding, page) in pages {
if encoding != Encoding::PLAIN {
// skip non-plain encodings for now as they are not yet implemented
if encoding != Encoding::PLAIN
&& encoding != Encoding::RLE_DICTIONARY
&& encoding != Encoding::PLAIN_DICTIONARY
{
// skip unsupported encodings for now as they are not yet implemented
continue;
}
let mut output = ViewBuffer::default();
Expand All @@ -399,7 +503,6 @@ mod tests {
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

assert_eq!(output.views.len(), 4);
assert_eq!(output.buffers.len(), 4);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());
Expand Down
13 changes: 13 additions & 0 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ pub struct ViewBuffer {
}

impl ViewBuffer {
pub fn is_empty(&self) -> bool {
self.views.is_empty()
}

#[allow(unused)]
pub fn append_block(&mut self, block: Buffer) -> u32 {
let block_id = self.buffers.len() as u32;
Expand All @@ -56,6 +60,15 @@ impl ViewBuffer {
self.views.push(view);
}

/// Directly append a view to the view array.
/// This is used when we create a StringViewArray from a dictionary whose values are StringViewArray.
///
/// # Safety
/// The `view` must be a valid view as per the ByteView spec.
pub unsafe fn append_raw_view_unchecked(&mut self, view: &u128) {
self.views.push(*view);
}

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
#[allow(unused)]
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: &ArrowType) -> ArrayRef {
Expand Down

0 comments on commit e7a0008

Please sign in to comment.