Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for more dictionary-encoded types. (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Aug 24, 2021
1 parent 8e96ec4 commit b2a1233
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 29 deletions.
6 changes: 6 additions & 0 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ def test(file: str, version: str, column, encoding: str):

test("generated_dictionary", "1", ("dict0", 0), "")
test("generated_dictionary", "2", ("dict0", 0), "")

test("generated_dictionary", "1", ("dict1", 1), "")
test("generated_dictionary", "2", ("dict1", 1), "")

test("generated_dictionary", "1", ("dict2", 2), "")
test("generated_dictionary", "2", ("dict2", 2), "")
158 changes: 158 additions & 0 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
};

use super::super::utils as other_utils;
use crate::{
array::{Array, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array},
bitmap::{utils::BitmapIter, MutableBitmap},
buffer::MutableBuffer,
error::{ArrowError, Result},
};

fn read_dict_optional<K, O>(
validity_buffer: &[u8],
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
indices: &mut MutableBuffer<K>,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) where
K: DictionaryKey,
O: Offset,
{
values.extend_from_slice(dict.values());
offsets.extend(
dict.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap()),
);

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

for run in validity_iterator {
match run {
hybrid_rle::HybridEncoded::Bitpacked(packed) => {
let remaining = additional - indices.len();
let len = std::cmp::min(packed.len() * 8, remaining);
for is_valid in BitmapIter::new(packed, 0, len) {
let value = if is_valid {
K::from_u32(new_indices.next().unwrap()).unwrap()
} else {
K::default()
};
indices.push(value);
}
validity.extend_from_slice(packed, 0, len);
}
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| {
let index = K::from_u32(new_indices.next().unwrap()).unwrap();
indices.push(index)
})
} else {
indices.extend_constant(additional, *indices.last().unwrap())
}
}
}
}
}

fn extend_from_page<K, O>(
page: &DataPage,
descriptor: &ColumnDescriptor,
indices: &mut MutableBuffer<K>,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) -> Result<()>
where
K: DictionaryKey,
O: Offset,
{
let additional = page.num_values();

assert_eq!(descriptor.max_rep_level(), 0);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_optional(
validity_buffer,
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
indices,
offsets,
values,
validity,
)
}
_ => {
return Err(other_utils::not_implemented(
&page.encoding(),
is_optional,
page.dictionary_page().is_some(),
version,
"primitive",
))
}
}
Ok(())
}

pub fn iter_to_array<K, O, I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
O: Offset,
K: DictionaryKey,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut indices,
&mut offsets,
&mut values,
&mut validity,
)?
}

let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into());
let values = Arc::new(Utf8Array::from_data(offsets.into(), values.into(), None));
Ok(Box::new(DictionaryArray::<K>::from_data(keys, values)))
}
2 changes: 2 additions & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod basic;
mod dictionary;
mod nested;

pub use basic::iter_to_array;
pub use dictionary::iter_to_array as iter_to_dict_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
106 changes: 94 additions & 12 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use parquet2::{
};

use crate::{
array::Array,
array::{Array, DictionaryKey},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};
Expand Down Expand Up @@ -83,6 +83,89 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
Ok(_read_metadata_async(reader).await?)
}

fn dict_read<
K: DictionaryKey,
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
let values_data_type = if let Dictionary(_, v) = &data_type {
v.as_ref()
} else {
panic!()
};

match values_data_type {
UInt8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u8,
),
UInt16 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u16,
),
UInt32 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u32,
),
Int8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i8,
),
Int16 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i16,
),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i32,
)
}
Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i64| x,
),
},
_ => unreachable!(),
},
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(iter, metadata, data_type, |x: i64| x)
}
Utf8 => binary::iter_to_dict_array::<K, i32, _, _>(iter, metadata),
LargeUtf8 => binary::iter_to_dict_array::<K, i64, _, _>(iter, metadata),
other => Err(ArrowError::NotYetImplemented(format!(
"Reading dictionaries of type {:?}",
other
))),
}
}

pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
Expand Down Expand Up @@ -174,17 +257,16 @@ pub fn page_iter_to_array<
))),
},

Dictionary(ref key, ref values) => match key.as_ref() {
Int32 => match values.as_ref() {
Int32 => primitive::iter_to_dict_array::<i32, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i32,
),
_ => todo!(),
},
_ => todo!(),
Dictionary(ref key, _) => match key.as_ref() {
Int8 => dict_read::<i8, _>(iter, metadata, data_type),
Int16 => dict_read::<i16, _>(iter, metadata, data_type),
Int32 => dict_read::<i32, _>(iter, metadata, data_type),
Int64 => dict_read::<i64, _>(iter, metadata, data_type),
UInt8 => dict_read::<u8, _>(iter, metadata, data_type),
UInt16 => dict_read::<u16, _>(iter, metadata, data_type),
UInt32 => dict_read::<u32, _>(iter, metadata, data_type),
UInt64 => dict_read::<u64, _>(iter, metadata, data_type),
_ => unreachable!(),
},

other => Err(ArrowError::NotYetImplemented(format!(
Expand Down
16 changes: 10 additions & 6 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ fn encode_keys<K: DictionaryKey>(

let mut buffer = vec![];

if let Some(validity) = validity {
let projected_val = array.iter().map(|x| {
let null_count = if let Some(validity) = validity {
let projected_validity = array.iter().map(|x| {
x.map(|x| validity.get_bit(x.to_usize().unwrap()))
.unwrap_or(false)
});
let projected_val = Bitmap::from_trusted_len_iter(projected_val);
let projected_val = Bitmap::from_trusted_len_iter(projected_validity);

let null_count = projected_val.null_count();

utils::write_def_levels(
&mut buffer,
Expand All @@ -40,6 +42,7 @@ fn encode_keys<K: DictionaryKey>(
array.len(),
options.version,
)?;
null_count
} else {
utils::write_def_levels(
&mut buffer,
Expand All @@ -48,7 +51,8 @@ fn encode_keys<K: DictionaryKey>(
array.len(),
options.version,
)?;
}
array.null_count()
};

let definition_levels_byte_length = buffer.len();

Expand All @@ -70,7 +74,7 @@ fn encode_keys<K: DictionaryKey>(
.flatten();
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8;

let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count());
let keys = utils::ExactSizedIter::new(keys, array.len() - null_count);

// num_bits as a single byte
buffer.push(num_bits);
Expand Down Expand Up @@ -104,7 +108,7 @@ fn encode_keys<K: DictionaryKey>(
None,
descriptor,
options,
Encoding::PlainDictionary,
Encoding::RleDictionary,
)
.map(CompressedPage::Data)
}
Expand Down
6 changes: 2 additions & 4 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ mod utils;

pub mod stream;

use std::sync::Arc;

use crate::array::*;
use crate::bitmap::Bitmap;
use crate::buffer::{Buffer, MutableBuffer};
Expand Down Expand Up @@ -105,7 +103,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {

/// Returns an iterator of compressed pages,
pub fn array_to_pages(
array: Arc<dyn Array>,
array: &dyn Array,
descriptor: ColumnDescriptor,
options: WriteOptions,
encoding: Encoding,
Expand All @@ -121,7 +119,7 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array.as_ref(), descriptor, options, encoding)
_ => array_to_page(array, descriptor, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<I: Iterator<Item = Result<RecordBatch>>> Iterator for RowGroupIterator<I> {
.zip(self.parquet_schema.columns().to_vec().into_iter())
.zip(encodings.into_iter())
.map(move |((array, type_), encoding)| {
array_to_pages(array, type_, options, encoding)
array_to_pages(array.as_ref(), type_, options, encoding)
}),
))
})
Expand Down
Loading

0 comments on commit b2a1233

Please sign in to comment.