From b7e23528c8574cf63b83795682dd9a2f926c2cde Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 24 Aug 2021 18:46:19 +0000 Subject: [PATCH] Added support for more dictionary-encoded types. --- .../main_spark.py | 6 + src/io/parquet/read/binary/dictionary.rs | 158 ++++++++++++++++++ src/io/parquet/read/binary/mod.rs | 2 + src/io/parquet/read/mod.rs | 106 ++++++++++-- src/io/parquet/write/dictionary.rs | 16 +- src/io/parquet/write/mod.rs | 6 +- src/io/parquet/write/record_batch.rs | 2 +- tests/it/io/parquet/mod.rs | 18 +- 8 files changed, 285 insertions(+), 29 deletions(-) create mode 100644 src/io/parquet/read/binary/dictionary.rs diff --git a/arrow-parquet-integration-testing/main_spark.py b/arrow-parquet-integration-testing/main_spark.py index a51dcbaf65c..876a1a808c4 100644 --- a/arrow-parquet-integration-testing/main_spark.py +++ b/arrow-parquet-integration-testing/main_spark.py @@ -43,3 +43,9 @@ def test(file: str, version: str, column, encoding: str): test("generated_dictionary", "1", ("dict0", 0), "") test("generated_dictionary", "2", ("dict0", 0), "") + +test("generated_dictionary", "1", ("dict1", 1), "") +test("generated_dictionary", "2", ("dict1", 1), "") + +test("generated_dictionary", "1", ("dict2", 2), "") +test("generated_dictionary", "2", ("dict2", 2), "") diff --git a/src/io/parquet/read/binary/dictionary.rs b/src/io/parquet/read/binary/dictionary.rs new file mode 100644 index 00000000000..d55fc81364f --- /dev/null +++ b/src/io/parquet/read/binary/dictionary.rs @@ -0,0 +1,158 @@ +use std::sync::Arc; + +use parquet2::{ + encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, + metadata::{ColumnChunkMetaData, ColumnDescriptor}, + page::{BinaryPageDict, DataPage}, + read::StreamingIterator, +}; + +use super::super::utils as other_utils; +use crate::{ + array::{Array, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array}, + bitmap::{utils::BitmapIter, MutableBitmap}, + buffer::MutableBuffer, + error::{ArrowError, Result}, +}; + +fn read_dict_optional( + validity_buffer: &[u8], + indices_buffer: &[u8], + additional: usize, + dict: &BinaryPageDict, + indices: &mut MutableBuffer, + offsets: &mut MutableBuffer, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, +) where + K: DictionaryKey, + O: Offset, +{ + values.extend_from_slice(dict.values()); + offsets.extend( + dict.offsets() + .iter() + .map(|x| O::from_usize(*x as usize).unwrap()), + ); + + // SPEC: Data page format: the bit width used to encode the entry ids stored as 1 byte (max bit width = 32), + // SPEC: followed by the values encoded using RLE/Bit packed described above (with the given bit width). + let bit_width = indices_buffer[0]; + let indices_buffer = &indices_buffer[1..]; + + let (_, consumed) = uleb128::decode(indices_buffer); + let indices_buffer = &indices_buffer[consumed..]; + + let non_null_indices_len = indices_buffer.len() * 8 / bit_width as usize; + + let mut new_indices = bitpacking::Decoder::new(indices_buffer, bit_width, non_null_indices_len); + + let validity_iterator = hybrid_rle::Decoder::new(validity_buffer, 1); + + for run in validity_iterator { + match run { + hybrid_rle::HybridEncoded::Bitpacked(packed) => { + let remaining = additional - indices.len(); + let len = std::cmp::min(packed.len() * 8, remaining); + for is_valid in BitmapIter::new(packed, 0, len) { + let value = if is_valid { + K::from_u32(new_indices.next().unwrap()).unwrap() + } else { + K::default() + }; + indices.push(value); + } + validity.extend_from_slice(packed, 0, len); + } + hybrid_rle::HybridEncoded::Rle(value, additional) => { + let is_set = value[0] == 1; + validity.extend_constant(additional, is_set); + if is_set { + (0..additional).for_each(|_| { + let index = K::from_u32(new_indices.next().unwrap()).unwrap(); + indices.push(index) + }) + } else { + indices.extend_constant(additional, *indices.last().unwrap()) + } + } + } + } +} + +fn extend_from_page( + page: &DataPage, + descriptor: &ColumnDescriptor, + indices: &mut MutableBuffer, + offsets: &mut MutableBuffer, + values: &mut MutableBuffer, + validity: &mut MutableBitmap, +) -> Result<()> +where + K: DictionaryKey, + O: Offset, +{ + let additional = page.num_values(); + + assert_eq!(descriptor.max_rep_level(), 0); + let is_optional = descriptor.max_def_level() == 1; + + let (validity_buffer, values_buffer, version) = other_utils::split_buffer(page, is_optional); + + match (&page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + read_dict_optional( + validity_buffer, + values_buffer, + additional, + dict.as_any().downcast_ref().unwrap(), + indices, + offsets, + values, + validity, + ) + } + _ => { + return Err(other_utils::not_implemented( + &page.encoding(), + is_optional, + page.dictionary_page().is_some(), + version, + "primitive", + )) + } + } + Ok(()) +} + +pub fn iter_to_array( + mut iter: I, + metadata: &ColumnChunkMetaData, +) -> Result> +where + ArrowError: From, + O: Offset, + K: DictionaryKey, + E: Clone, + I: StreamingIterator>, +{ + let capacity = metadata.num_values() as usize; + let mut indices = MutableBuffer::::with_capacity(capacity); + let mut values = MutableBuffer::::with_capacity(0); + let mut offsets = MutableBuffer::::with_capacity(1 + capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + while let Some(page) = iter.next() { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut indices, + &mut offsets, + &mut values, + &mut validity, + )? + } + + let keys = PrimitiveArray::from_data(K::DATA_TYPE, indices.into(), validity.into()); + let values = Arc::new(Utf8Array::from_data(offsets.into(), values.into(), None)); + Ok(Box::new(DictionaryArray::::from_data(keys, values))) +} diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 912f1bb026b..54da7b3f1db 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -1,6 +1,8 @@ mod basic; +mod dictionary; mod nested; pub use basic::iter_to_array; +pub use dictionary::iter_to_array as iter_to_dict_array; pub use basic::stream_to_array; pub use nested::iter_to_array as iter_to_array_nested; diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 81797a698f5..6146787a440 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -21,7 +21,7 @@ pub use parquet2::{ }; use crate::{ - array::Array, + array::{Array, DictionaryKey}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{ArrowError, Result}, }; @@ -83,6 +83,89 @@ pub async fn read_metadata_async( Ok(_read_metadata_async(reader).await?) } +fn dict_read< + K: DictionaryKey, + I: StreamingIterator>, +>( + iter: &mut I, + metadata: &ColumnChunkMetaData, + data_type: DataType, +) -> Result> { + use DataType::*; + let values_data_type = if let Dictionary(_, v) = &data_type { + v.as_ref() + } else { + panic!() + }; + + match values_data_type { + UInt8 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as u8, + ), + UInt16 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as u16, + ), + UInt32 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as u32, + ), + Int8 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as i8, + ), + Int16 => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as i16, + ), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i32| x as i32, + ) + } + Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int96 => primitive::iter_to_dict_array::( + iter, + metadata, + DataType::Timestamp(TimeUnit::Nanosecond, None), + int96_to_i64_ns, + ), + _ => primitive::iter_to_dict_array::( + iter, + metadata, + data_type, + |x: i64| x, + ), + }, + _ => unreachable!(), + }, + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::iter_to_dict_array::(iter, metadata, data_type, |x: i64| x) + } + Utf8 => binary::iter_to_dict_array::(iter, metadata), + LargeUtf8 => binary::iter_to_dict_array::(iter, metadata), + other => Err(ArrowError::NotYetImplemented(format!( + "Reading dictionaries of type {:?}", + other + ))), + } +} + pub fn page_iter_to_array< I: StreamingIterator>, >( @@ -174,17 +257,16 @@ pub fn page_iter_to_array< ))), }, - Dictionary(ref key, ref values) => match key.as_ref() { - Int32 => match values.as_ref() { - Int32 => primitive::iter_to_dict_array::( - iter, - metadata, - data_type, - |x: i32| x as i32, - ), - _ => todo!(), - }, - _ => todo!(), + Dictionary(ref key, _) => match key.as_ref() { + Int8 => dict_read::(iter, metadata, data_type), + Int16 => dict_read::(iter, metadata, data_type), + Int32 => dict_read::(iter, metadata, data_type), + Int64 => dict_read::(iter, metadata, data_type), + UInt8 => dict_read::(iter, metadata, data_type), + UInt16 => dict_read::(iter, metadata, data_type), + UInt32 => dict_read::(iter, metadata, data_type), + UInt64 => dict_read::(iter, metadata, data_type), + _ => unreachable!(), }, other => Err(ArrowError::NotYetImplemented(format!( diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 5a7e5ad16b2..c379040d8e4 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -26,12 +26,14 @@ fn encode_keys( let mut buffer = vec![]; - if let Some(validity) = validity { - let projected_val = array.iter().map(|x| { + let null_count = if let Some(validity) = validity { + let projected_validity = array.iter().map(|x| { x.map(|x| validity.get_bit(x.to_usize().unwrap())) .unwrap_or(false) }); - let projected_val = Bitmap::from_trusted_len_iter(projected_val); + let projected_val = Bitmap::from_trusted_len_iter(projected_validity); + + let null_count = projected_val.null_count(); utils::write_def_levels( &mut buffer, @@ -40,6 +42,7 @@ fn encode_keys( array.len(), options.version, )?; + null_count } else { utils::write_def_levels( &mut buffer, @@ -48,7 +51,8 @@ fn encode_keys( array.len(), options.version, )?; - } + array.null_count() + }; let definition_levels_byte_length = buffer.len(); @@ -70,7 +74,7 @@ fn encode_keys( .flatten(); let num_bits = utils::get_bit_width(keys.clone().max().unwrap_or(0) as u64) as u8; - let keys = utils::ExactSizedIter::new(keys, array.len() - array.null_count()); + let keys = utils::ExactSizedIter::new(keys, array.len() - null_count); // num_bits as a single byte buffer.push(num_bits); @@ -104,7 +108,7 @@ fn encode_keys( None, descriptor, options, - Encoding::PlainDictionary, + Encoding::RleDictionary, ) .map(CompressedPage::Data) } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 19c64cbe258..febc803efc6 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -11,8 +11,6 @@ mod utils; pub mod stream; -use std::sync::Arc; - use crate::array::*; use crate::bitmap::Bitmap; use crate::buffer::{Buffer, MutableBuffer}; @@ -105,7 +103,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of compressed pages, pub fn array_to_pages( - array: Arc, + array: &dyn Array, descriptor: ColumnDescriptor, options: WriteOptions, encoding: Encoding, @@ -121,7 +119,7 @@ pub fn array_to_pages( ) }) } - _ => array_to_page(array.as_ref(), descriptor, options, encoding) + _ => array_to_page(array, descriptor, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } diff --git a/src/io/parquet/write/record_batch.rs b/src/io/parquet/write/record_batch.rs index 3db125f815e..1dffa13a03c 100644 --- a/src/io/parquet/write/record_batch.rs +++ b/src/io/parquet/write/record_batch.rs @@ -59,7 +59,7 @@ impl>> Iterator for RowGroupIterator { .zip(self.parquet_schema.columns().to_vec().into_iter()) .zip(encodings.into_iter()) .map(move |((array, type_), encoding)| { - array_to_pages(array, type_, options, encoding) + array_to_pages(array.as_ref(), type_, options, encoding) }), )) }) diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 138538e32f3..3a520e006e1 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -402,12 +402,12 @@ fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result let row_groups = batches.iter().map(|batch| { let iterator = DynIter::new(batch.columns().iter().zip(descritors.clone()).map( |(array, type_)| { - Ok(DynIter::new(std::iter::once(array_to_page( - array.as_ref(), - type_, - options, - Encoding::Plain, - )))) + let encoding = if let DataType::Dictionary(_, _) = array.data_type() { + Encoding::RleDictionary + } else { + Encoding::Plain + }; + array_to_pages(array.as_ref(), type_, options, encoding) }, )); Ok(iterator) @@ -455,6 +455,12 @@ fn roundtrip_100_primitive() -> Result<()> { test_file("1.0.0-bigendian", "generated_primitive") } +#[test] +fn roundtrip_100_dict() -> Result<()> { + test_file("1.0.0-littleendian", "generated_dictionary")?; + test_file("1.0.0-bigendian", "generated_dictionary") +} + /// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its /// logical types. #[test]