Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for remaining non-nested datatypes #336

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ def test(file: str, version: str, column, encoding: str):

test("generated_dictionary", "1", ("dict0", 0), "")
test("generated_dictionary", "2", ("dict0", 0), "")

test("generated_dictionary", "1", ("dict1", 1), "")
test("generated_dictionary", "2", ("dict1", 1), "")

test("generated_dictionary", "1", ("dict2", 2), "")
test("generated_dictionary", "2", ("dict2", 2), "")
158 changes: 158 additions & 0 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage},
read::StreamingIterator,
};

use super::super::utils as other_utils;
use crate::{
array::{Array, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array},
bitmap::{utils::BitmapIter, MutableBitmap},
buffer::MutableBuffer,
error::{ArrowError, Result},
};

fn read_dict_optional<K, O>(
validity_buffer: &[u8],
indices_buffer: &[u8],
additional: usize,
dict: &BinaryPageDict,
indices: &mut MutableBuffer<K>,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) where
K: DictionaryKey,
O: Offset,
{
values.extend_from_slice(dict.values());
offsets.extend(
dict.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap()),
);

// SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32),
// SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width).
let bit_width = indices_buffer[0];
let indices_buffer = &indices_buffer[1..];

let (_, consumed) = uleb128::decode(indices_buffer);
let indices_buffer = &indices_buffer[consumed..];

let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize;

let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len);

let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1);

for run in validity_iterator {
match run {
hybrid_rle::HybridEncoded::Bitpacked(packed) => {
let remaining = additional - indices.len();
let len = std::cmp::min(packed.len() * 8, remaining);
for is_valid in BitmapIter::new(packed, 0, len) {
let value = if is_valid {
K::from_u32(new_indices.next().unwrap()).unwrap()
} else {
K::default()
};
indices.push(value);
}
validity.extend_from_slice(packed, 0, len);
}
hybrid_rle::HybridEncoded::Rle(value, additional) => {
let is_set = value[0] == 1;
validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| {
let index = K::from_u32(new_indices.next().unwrap()).unwrap();
indices.push(index)
})
} else {
indices.extend_constant(additional, *indices.last().unwrap())
}
}
}
}
}

fn extend_from_page<K, O>(
page: &DataPage,
descriptor: &ColumnDescriptor,
indices: &mut MutableBuffer<K>,
offsets: &mut MutableBuffer<O>,
values: &mut MutableBuffer<u8>,
validity: &mut MutableBitmap,
) -> Result<()>
where
K: DictionaryKey,
O: Offset,
{
let additional = page.num_values();

assert_eq!(descriptor.max_rep_level(), 0);
let is_optional = descriptor.max_def_level() == 1;

let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
read_dict_optional(
validity_buffer,
values_buffer,
additional,
dict.as_any().downcast_ref().unwrap(),
indices,
offsets,
values,
validity,
)
}
_ => {
return Err(other_utils::not_implemented(
&page.encoding(),
is_optional,
page.dictionary_page().is_some(),
version,
"primitive",
))
}
}
Ok(())
}

pub fn iter_to_array<K, O, I, E>(
mut iter: I,
metadata: &ColumnChunkMetaData,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
O: Offset,
K: DictionaryKey,
E: Clone,
I: StreamingIterator<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut indices = MutableBuffer::<K>::with_capacity(capacity);
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
let mut validity = MutableBitmap::with_capacity(capacity);
while let Some(page) = iter.next() {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut indices,
&mut offsets,
&mut values,
&mut validity,
)?
}

let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into());
let values = Arc::new(Utf8Array::from_data(offsets.into(), values.into(), None));
Ok(Box::new(DictionaryArray::<K>::from_data(keys, values)))
}
2 changes: 2 additions & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod basic;
mod dictionary;
mod nested;

pub use basic::iter_to_array;
pub use dictionary::iter_to_array as iter_to_dict_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
106 changes: 94 additions & 12 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use parquet2::{
};

use crate::{
array::Array,
array::{Array, DictionaryKey},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{ArrowError, Result},
};
Expand Down Expand Up @@ -83,6 +83,89 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
Ok(_read_metadata_async(reader).await?)
}

fn dict_read<
K: DictionaryKey,
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
iter: &mut I,
metadata: &ColumnChunkMetaData,
data_type: DataType,
) -> Result<Box<dyn Array>> {
use DataType::*;
let values_data_type = if let Dictionary(_, v) = &data_type {
v.as_ref()
} else {
panic!()
};

match values_data_type {
UInt8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u8,
),
UInt16 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u16,
),
UInt32 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as u32,
),
Int8 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i8,
),
Int16 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i16,
),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i32,
)
}
Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => match physical_type {
PhysicalType::Int96 => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
DataType::Timestamp(TimeUnit::Nanosecond, None),
int96_to_i64_ns,
),
_ => primitive::iter_to_dict_array::<K, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i64| x,
),
},
_ => unreachable!(),
},
Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => {
primitive::iter_to_dict_array::<K, _, _, _, _, _>(iter, metadata, data_type, |x: i64| x)
}
Utf8 => binary::iter_to_dict_array::<K, i32, _, _>(iter, metadata),
LargeUtf8 => binary::iter_to_dict_array::<K, i64, _, _>(iter, metadata),
other => Err(ArrowError::NotYetImplemented(format!(
"Reading dictionaries of type {:?}",
other
))),
}
}

pub fn page_iter_to_array<
I: StreamingIterator<Item = std::result::Result<DataPage, ParquetError>>,
>(
Expand Down Expand Up @@ -174,17 +257,16 @@ pub fn page_iter_to_array<
))),
},

Dictionary(ref key, ref values) => match key.as_ref() {
Int32 => match values.as_ref() {
Int32 => primitive::iter_to_dict_array::<i32, _, _, _, _, _>(
iter,
metadata,
data_type,
|x: i32| x as i32,
),
_ => todo!(),
},
_ => todo!(),
Dictionary(ref key, _) => match key.as_ref() {
Int8 => dict_read::<i8, _>(iter, metadata, data_type),
Int16 => dict_read::<i16, _>(iter, metadata, data_type),
Int32 => dict_read::<i32, _>(iter, metadata, data_type),
Int64 => dict_read::<i64, _>(iter, metadata, data_type),
UInt8 => dict_read::<u8, _>(iter, metadata, data_type),
UInt16 => dict_read::<u16, _>(iter, metadata, data_type),
UInt32 => dict_read::<u32, _>(iter, metadata, data_type),
UInt64 => dict_read::<u64, _>(iter, metadata, data_type),
_ => unreachable!(),
},

other => Err(ArrowError::NotYetImplemented(format!(
Expand Down
16 changes: 10 additions & 6 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ fn encode_keys<K: DictionaryKey>(

let mut buffer = vec![];

if let Some(validity) = validity {
let projected_val = array.iter().map(|x| {
let null_count = if let Some(validity) = validity {
let projected_validity = array.iter().map(|x| {
x.map(|x| validity.get_bit(x.to_usize().unwrap()))
.unwrap_or(false)
});
let projected_val = Bitmap::from_trusted_len_iter(projected_val);
let projected_val = Bitmap::from_trusted_len_iter(projected_validity);

let null_count = projected_val.null_count();

utils::write_def_levels(
&mut buffer,
Expand All @@ -40,6 +42,7 @@ fn encode_keys<K: DictionaryKey>(
array.len(),
options.version,
)?;
null_count
} else {
utils::write_def_levels(
&mut buffer,
Expand All @@ -48,7 +51,8 @@ fn encode_keys<K: DictionaryKey>(
array.len(),
options.version,
)?;
}
array.null_count()
};

let definition_levels_byte_length = buffer.len();

Expand All @@ -70,7 +74,7 @@ fn encode_keys<K: DictionaryKey>(
.flatten();
let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8;

let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count());
let keys = utils::ExactSizedIter::new(keys, array.len() - null_count);

// num_bits as a single byte
buffer.push(num_bits);
Expand Down Expand Up @@ -104,7 +108,7 @@ fn encode_keys<K: DictionaryKey>(
None,
descriptor,
options,
Encoding::PlainDictionary,
Encoding::RleDictionary,
)
.map(CompressedPage::Data)
}
Expand Down
6 changes: 2 additions & 4 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ mod utils;

pub mod stream;

use std::sync::Arc;

use crate::array::*;
use crate::bitmap::Bitmap;
use crate::buffer::{Buffer, MutableBuffer};
Expand Down Expand Up @@ -105,7 +103,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {

/// Returns an iterator of compressed pages,
pub fn array_to_pages(
array: Arc<dyn Array>,
array: &dyn Array,
descriptor: ColumnDescriptor,
options: WriteOptions,
encoding: Encoding,
Expand All @@ -121,7 +119,7 @@ pub fn array_to_pages(
)
})
}
_ => array_to_page(array.as_ref(), descriptor, options, encoding)
_ => array_to_page(array, descriptor, options, encoding)
.map(|page| DynIter::new(std::iter::once(Ok(page)))),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/write/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<I: Iterator<Item = Result<RecordBatch>>> Iterator for RowGroupIterator<I> {
.zip(self.parquet_schema.columns().to_vec().into_iter())
.zip(encodings.into_iter())
.map(move |((array, type_), encoding)| {
array_to_pages(array, type_, options, encoding)
array_to_pages(array.as_ref(), type_, options, encoding)
}),
))
})
Expand Down
Loading