Skip to content

Commit

Permalink
read to view type using offset_buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Mar 31, 2024
1 parent 12f746b commit d2f1a30
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 774 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Fields, SchemaBuilder};

use crate::arrow::array_reader::byte_view_array::make_byte_view_array_reader;
use crate::arrow::array_reader::byte_array::make_byte_view_array_reader;
use crate::arrow::array_reader::empty_array::make_empty_array_reader;
use crate::arrow::array_reader::fixed_len_byte_array::make_fixed_len_byte_array_reader;
use crate::arrow::array_reader::{
Expand Down
144 changes: 143 additions & 1 deletion parquet/src/arrow/array_reader/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,36 @@ pub fn make_byte_array_reader(
}
}

/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
pub fn make_byte_view_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Some(t) => t,
None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
_ => ArrowType::BinaryView,
},
};

match data_type {
ArrowType::BinaryView | ArrowType::Utf8View => {
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}

_ => Err(general_err!(
"invalid data type for byte array reader read to view type - {}",
data_type
)),
}
}

/// An [`ArrayReader`] for variable length byte arrays
struct ByteArrayReader<I: OffsetSizeTrait> {
data_type: ArrowType,
Expand Down Expand Up @@ -588,7 +618,7 @@ mod tests {
use super::*;
use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
use crate::arrow::record_reader::buffer::ValuesBuffer;
use arrow_array::{Array, StringArray};
use arrow_array::{Array, StringArray, StringViewArray};
use arrow_buffer::Buffer;

#[test]
Expand Down Expand Up @@ -646,6 +676,64 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);

assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
assert_eq!(
output.values.as_slice(),
"helloworldlarge payload over 12 bytesb".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 37, 38]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("world"),
None,
Some("large payload over 12 bytes"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
Expand Down Expand Up @@ -690,6 +778,60 @@ mod tests {
}
}

#[test]
fn test_byte_array_string_view_decoder_skip() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "large payload over 12 bytes"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
assert_eq!(
output.values.as_slice(),
"hellolarge payload over 12 bytes".as_bytes()
);
assert_eq!(output.offsets.as_slice(), &[0, 5, 32]);

assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);

let valid = [false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8View);
let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("large payload over 12 bytes"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand Down
Loading

0 comments on commit d2f1a30

Please sign in to comment.