-
Notifications
You must be signed in to change notification settings - Fork 807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement dictionary support for reading ByteView from parquet #5973
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; | ||
use crate::arrow::buffer::view_buffer::ViewBuffer; | ||
use crate::arrow::decoder::DictIndexDecoder; | ||
use crate::arrow::record_reader::GenericRecordReader; | ||
use crate::arrow::schema::parquet_to_arrow_field; | ||
use crate::basic::{ConvertedType, Encoding}; | ||
|
@@ -25,6 +26,7 @@ use crate::column::reader::decoder::ColumnValueDecoder; | |
use crate::errors::{ParquetError, Result}; | ||
use crate::schema::types::ColumnDescPtr; | ||
use arrow_array::ArrayRef; | ||
use arrow_data::ByteView; | ||
use arrow_schema::DataType as ArrowType; | ||
use bytes::Bytes; | ||
use std::any::Any; | ||
|
@@ -210,6 +212,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder { | |
/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`] | ||
pub enum ByteViewArrayDecoder { | ||
Plain(ByteViewArrayDecoderPlain), | ||
Dictionary(ByteViewArrayDecoderDictionary), | ||
} | ||
|
||
impl ByteViewArrayDecoder { | ||
|
@@ -227,10 +230,14 @@ impl ByteViewArrayDecoder { | |
num_values, | ||
validate_utf8, | ||
)), | ||
Encoding::RLE_DICTIONARY | ||
| Encoding::PLAIN_DICTIONARY | ||
| Encoding::DELTA_LENGTH_BYTE_ARRAY | ||
| Encoding::DELTA_BYTE_ARRAY => unimplemented!("stay tuned!"), | ||
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => { | ||
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new( | ||
data, num_levels, num_values, | ||
)) | ||
} | ||
Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => { | ||
unimplemented!("stay tuned!") | ||
} | ||
_ => { | ||
return Err(general_err!( | ||
"unsupported encoding for byte array: {}", | ||
|
@@ -247,17 +254,27 @@ impl ByteViewArrayDecoder { | |
&mut self, | ||
out: &mut ViewBuffer, | ||
len: usize, | ||
_dict: Option<&ViewBuffer>, | ||
dict: Option<&ViewBuffer>, | ||
) -> Result<usize> { | ||
match self { | ||
ByteViewArrayDecoder::Plain(d) => d.read(out, len), | ||
ByteViewArrayDecoder::Dictionary(d) => { | ||
let dict = dict | ||
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; | ||
d.read(out, dict, len) | ||
} | ||
} | ||
} | ||
|
||
/// Skip `len` values | ||
pub fn skip(&mut self, len: usize, _dict: Option<&ViewBuffer>) -> Result<usize> { | ||
pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> { | ||
match self { | ||
ByteViewArrayDecoder::Plain(d) => d.skip(len), | ||
ByteViewArrayDecoder::Dictionary(d) => { | ||
let dict = dict | ||
.ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?; | ||
d.skip(dict, len) | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -348,6 +365,90 @@ impl ByteViewArrayDecoderPlain { | |
} | ||
} | ||
|
||
pub struct ByteViewArrayDecoderDictionary { | ||
decoder: DictIndexDecoder, | ||
} | ||
|
||
impl ByteViewArrayDecoderDictionary { | ||
fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self { | ||
Self { | ||
decoder: DictIndexDecoder::new(data, num_levels, num_values), | ||
} | ||
} | ||
|
||
/// Reads the next indexes from self.decoder | ||
/// the indexes are assumed to be indexes into `dict` | ||
/// the output values are written to output | ||
/// | ||
/// Assumptions / Optimization | ||
/// This function checks if dict.buffers() are the last buffers in `output`, and if so | ||
/// reuses the dictionary page buffers directly without copying data | ||
fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> { | ||
if dict.is_empty() || len == 0 { | ||
return Ok(0); | ||
} | ||
|
||
// Check if the last few buffer of `output`` are the same as the `dict` buffer | ||
// This is to avoid creating a new buffers if the same dictionary is used for multiple `read` | ||
let need_to_create_new_buffer = { | ||
if output.buffers.len() >= dict.buffers.len() { | ||
let offset = output.buffers.len() - dict.buffers.len(); | ||
output.buffers[offset..] | ||
.iter() | ||
.zip(dict.buffers.iter()) | ||
.any(|(a, b)| !a.ptr_eq(b)) | ||
} else { | ||
true | ||
} | ||
}; | ||
|
||
if need_to_create_new_buffer { | ||
for b in dict.buffers.iter() { | ||
output.buffers.push(b.clone()); | ||
} | ||
} | ||
|
||
// Calculate the offset of the dictionary buffers in the output buffers | ||
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers, | ||
// then the base_buffer_idx is 5 - 2 = 3 | ||
let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32; | ||
|
||
self.decoder.read(len, |keys| { | ||
for k in keys { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we know for sure There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The input is untrusted, but I think we can do very little to make it safer, e.g., if the view is maliciously crafted we can easily run into memory issues due to too large offset or invalid buffer idx. |
||
let view = dict | ||
.views | ||
.get(*k as usize) | ||
.ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?; | ||
let len = *view as u32; | ||
if len <= 12 { | ||
// directly append the view if it is inlined | ||
// Safety: the view is from the dictionary, so it is valid | ||
unsafe { | ||
output.append_raw_view_unchecked(view); | ||
} | ||
} else { | ||
// correct the buffer index and append the view | ||
let mut view = ByteView::from(*view); | ||
view.buffer_index += base_buffer_idx; | ||
// Safety: the view is from the dictionary, | ||
// we corrected the index value to point it to output buffer, so it is valid | ||
unsafe { | ||
output.append_raw_view_unchecked(&view.into()); | ||
} | ||
} | ||
} | ||
Ok(()) | ||
}) | ||
} | ||
|
||
fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> { | ||
if dict.is_empty() { | ||
return Ok(0); | ||
} | ||
self.decoder.skip(to_skip) | ||
} | ||
} | ||
|
||
/// Check that `val` is a valid UTF-8 sequence | ||
pub fn check_valid_utf8(val: &[u8]) -> Result<()> { | ||
match std::str::from_utf8(val) { | ||
|
@@ -386,8 +487,11 @@ mod tests { | |
.unwrap(); | ||
|
||
for (encoding, page) in pages { | ||
if encoding != Encoding::PLAIN { | ||
// skip non-plain encodings for now as they are not yet implemented | ||
if encoding != Encoding::PLAIN | ||
&& encoding != Encoding::RLE_DICTIONARY | ||
&& encoding != Encoding::PLAIN_DICTIONARY | ||
{ | ||
// skip unsupported encodings for now as they are not yet implemented | ||
continue; | ||
} | ||
let mut output = ViewBuffer::default(); | ||
|
@@ -399,7 +503,6 @@ mod tests { | |
assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); | ||
|
||
assert_eq!(output.views.len(), 4); | ||
assert_eq!(output.buffers.len(), 4); | ||
|
||
let valid = [false, false, true, true, false, true, true, false, false]; | ||
let valid_buffer = Buffer::from_iter(valid.iter().cloned()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a clever optimization