Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Reduced code duplication (#805)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 4, 2022
1 parent 8fcab5c commit 694b895
Show file tree
Hide file tree
Showing 13 changed files with 621 additions and 740 deletions.
6 changes: 3 additions & 3 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,15 +277,15 @@ pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
)
}

pub struct BinaryArrayIterator<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
chunk_size: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> BinaryArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
Self {
iter,
Expand All @@ -297,7 +297,7 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> BinaryArrayIterator<O, A,
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for BinaryArrayIterator<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I> {
type Item = Result<A>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
166 changes: 46 additions & 120 deletions src/io/parquet/read/binary/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
use std::{collections::VecDeque, sync::Arc};

use parquet2::page::BinaryPageDict;
use parquet2::page::{BinaryPageDict, DictPage};

use crate::{
array::{
Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, PrimitiveArray, Utf8Array,
},
array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Offset, Utf8Array},
bitmap::MutableBitmap,
datatypes::{DataType, PhysicalType},
error::{ArrowError, Result},
error::Result,
io::parquet::read::utils::MaybeNext,
};

use super::super::dictionary::*;
use super::super::utils;
use super::super::utils::Decoder;
use super::super::ArrayIter;
use super::super::DataPages;

/// An iterator adapter over [`DataPages`] assumed to be encoded as parquet's dictionary-encoded binary representation
#[derive(Debug)]
pub struct ArrayIterator<K, O, I>
pub struct DictIter<K, O, I>
where
I: DataPages,
O: Offset,
Expand All @@ -33,13 +29,13 @@ where
phantom: std::marker::PhantomData<O>,
}

impl<K, O, I> ArrayIterator<K, O, I>
impl<K, O, I> DictIter<K, O, I>
where
K: DictionaryKey,
O: Offset,
I: DataPages,
{
fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
let data_type = match data_type {
DataType::Dictionary(_, values, _) => values.as_ref().clone(),
_ => unreachable!(),
Expand All @@ -55,7 +51,33 @@ where
}
}

impl<K, O, I> Iterator for ArrayIterator<K, O, I>
fn read_dict<O: Offset>(data_type: DataType, dict: &dyn DictPage) -> Arc<dyn Array> {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

match data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => Arc::new(Utf8Array::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
PhysicalType::Binary | PhysicalType::LargeBinary => Arc::new(BinaryArray::<O>::from_data(
data_type,
offsets.into(),
values.into(),
None,
)) as _,
_ => unreachable!(),
}
}

impl<K, O, I> Iterator for DictIter<K, O, I>
where
I: DataPages,
O: Offset,
Expand All @@ -64,114 +86,18 @@ where
type Item = Result<DictionaryArray<K>>;

fn next(&mut self) -> Option<Self::Item> {
// back[a1, a2, a3, ...]front
if self.items.len() > 1 {
return self.items.pop_back().map(|(values, validity)| {
let keys = finish_key(values, validity);
let values = self.values.unwrap();
Ok(DictionaryArray::from_data(keys, values))
});
}
match (self.items.pop_back(), self.iter.next()) {
(_, Err(e)) => Some(Err(e.into())),
(None, Ok(None)) => None,
(state, Ok(Some(page))) => {
// consume the dictionary page
if let Some(dict) = page.dictionary_page() {
let dict = dict.as_any().downcast_ref::<BinaryPageDict>().unwrap();
self.values = match &mut self.values {
Dict::Empty => {
let offsets = dict
.offsets()
.iter()
.map(|x| O::from_usize(*x as usize).unwrap())
.collect::<Vec<_>>();
let values = dict.values().to_vec();

let array = match self.data_type.to_physical_type() {
PhysicalType::Utf8 | PhysicalType::LargeUtf8 => {
Arc::new(Utf8Array::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
PhysicalType::Binary | PhysicalType::LargeBinary => {
Arc::new(BinaryArray::<O>::from_data(
self.data_type.clone(),
offsets.into(),
values.into(),
None,
)) as _
}
_ => unreachable!(),
};

Dict::Complete(array)
}
_ => unreachable!(),
};
} else {
return Some(Err(ArrowError::nyi(
"dictionary arrays from non-dict-encoded pages",
)));
}

let maybe_array = {
// there is a new page => consume the page from the start
let maybe_page = PrimitiveDecoder::default().build_state(page);
let page = match maybe_page {
Ok(page) => page,
Err(e) => return Some(Err(e)),
};

utils::extend_from_new_page::<PrimitiveDecoder<K>, _, _>(
page,
state,
self.chunk_size,
&mut self.items,
&PrimitiveDecoder::default(),
)
};
match maybe_array {
Ok(Some((values, validity))) => {
let keys = PrimitiveArray::from_data(
K::PRIMITIVE.into(),
values.into(),
validity.into(),
);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
Ok(None) => self.next(),
Err(e) => Some(Err(e)),
}
}
(Some((values, validity)), Ok(None)) => {
// we have a populated item and no more pages
// the only case where an item's length may be smaller than chunk_size
debug_assert!(values.len() <= self.chunk_size);

let keys = finish_key(values, validity);

let values = self.values.unwrap();
Some(Ok(DictionaryArray::from_data(keys, values)))
}
let maybe_state = next_dict(
&mut self.iter,
&mut self.items,
&mut self.values,
self.chunk_size,
|dict| read_dict::<O>(self.data_type.clone(), dict),
);
match maybe_state {
MaybeNext::Some(Ok(dict)) => Some(Ok(dict)),
MaybeNext::Some(Err(e)) => Some(Err(e)),
MaybeNext::None => None,
MaybeNext::More => self.next(),
}
}
}

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, K, O, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
O: Offset,
K: DictionaryKey,
{
Box::new(
ArrayIterator::<K, O, I>::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
18 changes: 2 additions & 16 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ mod dictionary;
mod nested;
mod utils;

pub use dictionary::iter_to_arrays as iter_to_dict_arrays;

use std::sync::Arc;

use crate::{
Expand All @@ -14,25 +12,13 @@ use crate::{

use self::basic::TraitBinaryArray;
use self::nested::ArrayIterator;
use super::ArrayIter;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};
use basic::BinaryArrayIterator;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, O, A, I>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: 'a + DataPages,
A: TraitBinaryArray<O>,
O: Offset,
{
Box::new(
BinaryArrayIterator::<O, A, I>::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
pub use basic::Iter;
pub use dictionary::DictIter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, O, A, I>(
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ fn finish(data_type: &DataType, values: MutableBitmap, validity: MutableBitmap)

/// An iterator adapter over [`DataPages`] assumed to be encoded as boolean arrays
#[derive(Debug)]
pub struct BooleanArrayIterator<I: DataPages> {
pub struct Iter<I: DataPages> {
iter: I,
data_type: DataType,
items: VecDeque<(MutableBitmap, MutableBitmap)>,
chunk_size: usize,
}

impl<I: DataPages> BooleanArrayIterator<I> {
impl<I: DataPages> Iter<I> {
pub fn new(iter: I, data_type: DataType, chunk_size: usize) -> Self {
Self {
iter,
Expand All @@ -158,7 +158,7 @@ impl<I: DataPages> BooleanArrayIterator<I> {
}
}

impl<I: DataPages> Iterator for BooleanArrayIterator<I> {
impl<I: DataPages> Iterator for Iter<I> {
type Item = Result<BooleanArray>;

fn next(&mut self) -> Option<Self::Item> {
Expand Down
15 changes: 2 additions & 13 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,15 @@ mod nested;

use std::sync::Arc;

use crate::{array::Array, datatypes::DataType};
use crate::array::Array;

use self::basic::BooleanArrayIterator;
use self::nested::ArrayIterator;
use super::ArrayIter;
use super::{
nested_utils::{InitNested, NestedArrayIter},
DataPages,
};

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays<'a, I: 'a>(iter: I, data_type: DataType, chunk_size: usize) -> ArrayIter<'a>
where
I: DataPages,
{
Box::new(
BooleanArrayIterator::new(iter, data_type, chunk_size)
.map(|x| x.map(|x| Arc::new(x) as Arc<dyn Array>)),
)
}
pub use self::basic::Iter;

/// Converts [`DataPages`] to an [`Iterator`] of [`Array`]
pub fn iter_to_arrays_nested<'a, I: 'a>(
Expand Down
Loading

0 comments on commit 694b895

Please sign in to comment.