Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to roundtrip dictionaries for fix-len
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 30, 2022
1 parent 66906ed commit 6c045e2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 1 deletion.
155 changes: 155 additions & 0 deletions src/io/parquet/read/fixed_size_binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use std::{collections::VecDeque, sync::Arc};

use parquet2::page::FixedLenByteArrayPageDict;

use crate::{
array::{Array, DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray},
bitmap::MutableBitmap,
datatypes::DataType,
error::{ArrowError, Result},
};

use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
#[derive(Debug)]
pub struct ArrayIterator<K, I>
where
I: DataPages,
K: DictionaryKey,
{
iter: I,
data_type: DataType,
values: Dict,
items: VecDeque<(Vec<K>, MutableBitmap)>,
chunk_size: usize,
}

impl<K, I> ArrayIterator<K, I>
where
K: DictionaryKey,
I: DataPages,
{
fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
};
Self {
iter,
data_type,
values: Dict::Empty,
items: VecDeque::new(),
chunk_size,
}
}
}

impl<K, I> Iterator for ArrayIterator<K, I>
where
I: DataPages,
K: DictionaryKey,
{
type Item = Result<DictionaryArray<K>>;

fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
let keys = finish_key(values, validity);
let values = self.values.unwrap();
Ok(DictionaryArray::from_data(keys, values))
});
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
(None, Ok(None)) => None,
(state, Ok(Some(page))) => {
// consume the dictionary page
if let Some(dict) = page.dictionary_page() {
let dict = dict
.as_any()
.downcast_ref::<FixedLenByteArrayPageDict>()
.unwrap();
self.values = match &mut self.values {
Dict::Empty => {
let values = dict.values().to_vec();

let array = Arc::new(FixedSizeBinaryArray::from_data(
self.data_type.clone(),
values.into(),
None,
)) as _;
Dict::Complete(array)
}
_ => unreachable!(),
};
} else {
return Some(Err(ArrowError::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}

let maybe_array = {
// there is a new page => consume the page from the start
let maybe_page = PrimitiveDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return Some(Err(e)),
};

utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
self.chunk_size,
&mut self.items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
}
(Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);

let keys = finish_key(values, validity);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, K, I>(
iter: I,
data_type: DataType,
chunk_size: usize,
) -> Box<dyn Iterator<Item = Result<Arc<dyn Array>>> + 'a>
where
I: 'a + DataPages,
K: DictionaryKey,
{
Box::new(
ArrayIterator::<K, I>::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
2 changes: 2 additions & 0 deletions src/io/parquet/read/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod basic;
mod dictionary;
mod utils;

pub use basic::BinaryArrayIterator;
pub use dictionary::iter_to_arrays as iter_to_dict_arrays;
3 changes: 3 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>(
LargeUtf8 | LargeBinary => {
binary::iter_to_dict_arrays::<K, i64, _>(iter, data_type, chunk_size)
}
FixedSizeBinary(_) => {
fixed_size_binary::iter_to_dict_arrays::<K, _>(iter, data_type, chunk_size)
}
other => {
return Err(ArrowError::nyi(format!(
"Reading dictionaries of type {:?}",
Expand Down
11 changes: 11 additions & 0 deletions src/scalar/equal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ fn equal(lhs: &dyn Scalar, rhs: &dyn Scalar) -> bool {
let rhs = rhs.as_any().downcast_ref::<StructScalar>().unwrap();
lhs == rhs
}
DataType::FixedSizeBinary(_) => {
let lhs = lhs
.as_any()
.downcast_ref::<FixedSizeBinaryScalar>()
.unwrap();
let rhs = rhs
.as_any()
.downcast_ref::<FixedSizeBinaryScalar>()
.unwrap();
lhs == rhs
}
other => unimplemented!("{:?}", other),
}
}
11 changes: 10 additions & 1 deletion tests/it/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,19 +717,28 @@ fn arrow_type() -> Result<()> {
let array3 = DictionaryArray::from_data(indices.clone(), std::sync::Arc::new(values));

let values = BinaryArray::<i32>::from_slice([b"ab", b"ac"]);
let array4 = DictionaryArray::from_data(indices, std::sync::Arc::new(values));
let array4 = DictionaryArray::from_data(indices.clone(), std::sync::Arc::new(values));

let values = FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(2),
vec![b'a', b'b', b'a', b'c'].into(),
None,
);
let array5 = DictionaryArray::from_data(indices, std::sync::Arc::new(values));

let schema = Schema::from(vec![
Field::new("a1", dt1, true),
Field::new("a2", array2.data_type().clone(), true),
Field::new("a3", array3.data_type().clone(), true),
Field::new("a4", array4.data_type().clone(), true),
Field::new("a5", array5.data_type().clone(), true),
]);
let batch = Chunk::try_new(vec![
Arc::new(array) as Arc<dyn Array>,
Arc::new(array2),
Arc::new(array3),
Arc::new(array4),
Arc::new(array5),
])?;

let r = integration_write(&schema, &[batch.clone()])?;
Expand Down

0 comments on commit 6c045e2

Please sign in to comment.