From 5dca6f07884745b5839cb91739faefd52f12b02b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 15 Apr 2022 15:09:25 +0100 Subject: [PATCH] Split out ListArrayReader into separate module (#1483) (#1563) * Split out ListArrayReader into separate module (#1483) * Fix merge conflict --- parquet/src/arrow/array_reader.rs | 430 +------------------ parquet/src/arrow/array_reader/list_array.rs | 379 ++++++++++++++++ parquet/src/arrow/array_reader/test_util.rs | 85 +++- 3 files changed, 473 insertions(+), 421 deletions(-) create mode 100644 parquet/src/arrow/array_reader/list_array.rs diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index f60e47c8925..f4139b2f654 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -24,19 +24,17 @@ use std::sync::Arc; use std::vec::Vec; use arrow::array::{ - new_empty_array, Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, - BooleanBufferBuilder, DecimalArray, GenericListArray, Int16BufferBuilder, Int32Array, - Int64Array, OffsetSizeTrait, PrimitiveArray, StructArray, UInt32Array, + Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, + DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray, + StructArray, }; use arrow::buffer::{Buffer, MutableBuffer}; -use arrow::compute::take; use arrow::datatypes::{ ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, - Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, ToByteSlice, + Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, }; -use arrow::util::bit_util; use crate::arrow::converter::Converter; use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; @@ -56,6 +54,7 @@ mod byte_array; mod byte_array_dictionary; mod dictionary_buffer; mod empty_array; +mod list_array; mod map_array; mod offset_buffer; @@ -65,6 +64,7 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; +pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; /// Array reader reads parquet data into arrow array. @@ -649,157 +649,6 @@ where } } -/// Implementation of list array reader. -pub struct ListArrayReader { - item_reader: Box, - data_type: ArrowType, - item_type: ArrowType, - list_def_level: i16, - list_rep_level: i16, - list_empty_def_level: i16, - list_null_def_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, - _marker: PhantomData, -} - -impl ListArrayReader { - /// Construct list array reader. - pub fn new( - item_reader: Box, - data_type: ArrowType, - item_type: ArrowType, - def_level: i16, - rep_level: i16, - list_null_def_level: i16, - list_empty_def_level: i16, - ) -> Self { - Self { - item_reader, - data_type, - item_type, - list_def_level: def_level, - list_rep_level: rep_level, - list_null_def_level, - list_empty_def_level, - def_level_buffer: None, - rep_level_buffer: None, - _marker: PhantomData, - } - } -} - -/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported. -impl ArrayReader for ListArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - /// Returns data type. - /// This must be a List. - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn next_batch(&mut self, batch_size: usize) -> Result { - let next_batch_array = self.item_reader.next_batch(batch_size)?; - - if next_batch_array.len() == 0 { - return Ok(new_empty_array(&self.data_type)); - } - let def_levels = self - .item_reader - .get_def_levels() - .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; - let rep_levels = self - .item_reader - .get_rep_levels() - .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; - - if !((def_levels.len() == rep_levels.len()) - && (rep_levels.len() == next_batch_array.len())) - { - return Err(ArrowError( - format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), - )); - } - - // List definitions can be encoded as 4 values: - // - n + 0: the list slot is null - // - n + 1: the list slot is not null, but is empty (i.e. []) - // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ]) - // - n + 3: the list slot is not null, and its child is not empty - // Where n is the max definition level of the list's parent. - // If a Parquet schema's only leaf is the list, then n = 0. - - // If the list index is at empty definition, the child slot is null - let non_null_list_indices = - def_levels.iter().enumerate().filter_map(|(index, def)| { - (*def > self.list_empty_def_level).then(|| index as u32) - }); - let indices = UInt32Array::from_iter_values(non_null_list_indices); - let batch_values = take(&*next_batch_array.clone(), &indices, None)?; - - // first item in each list has rep_level = 0, subsequent items have rep_level = 1 - let mut offsets: Vec = Vec::new(); - let mut cur_offset = OffsetSize::zero(); - def_levels.iter().zip(rep_levels).for_each(|(d, r)| { - if *r == 0 || d == &self.list_empty_def_level { - offsets.push(cur_offset); - } - if d > &self.list_empty_def_level { - cur_offset += OffsetSize::one(); - } - }); - - offsets.push(cur_offset); - - let num_bytes = bit_util::ceil(offsets.len(), 8); - // TODO: A useful optimization is to use the null count to fill with - // 0 or null, to reduce individual bits set in a loop. - // To favour dense data, set every slot to true, then unset - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); - let null_slice = null_buf.as_slice_mut(); - let mut list_index = 0; - for i in 0..rep_levels.len() { - // If the level is lower than empty, then the slot is null. - // When a list is non-nullable, its empty level = null level, - // so this automatically factors that in. - if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { - bit_util::unset_bit(null_slice, list_index); - } - if rep_levels[i] == 0 { - list_index += 1; - } - } - let value_offsets = Buffer::from(&offsets.to_byte_slice()); - - let list_data = ArrayData::builder(self.get_data_type().clone()) - .len(offsets.len() - 1) - .add_buffer(value_offsets) - .add_child_data(batch_values.data().clone()) - .null_bit_buffer(null_buf.into()) - .offset(next_batch_array.offset()); - - let list_data = unsafe { list_data.build_unchecked() }; - - let result_array = GenericListArray::::from(list_data); - Ok(Arc::new(result_array)) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) - } -} - /// Implementation of struct array reader. pub struct StructArrayReader { children: Vec>, @@ -978,19 +827,14 @@ impl ArrayReader for StructArrayReader { #[cfg(test)] mod tests { - use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; use rand::distributions::uniform::SampleUniform; use rand::{thread_rng, Rng}; - use crate::arrow::{parquet_to_arrow_schema, ArrowWriter}; - use arrow::array::{ - Array, ArrayRef, LargeListArray, ListArray, PrimitiveArray, StringArray, - StructArray, - }; - use arrow::datatypes::DataType::Struct; + use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; use arrow::datatypes::{ ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, Int32Type as ArrowInt32, Int64Type as ArrowInt64, @@ -1002,11 +846,8 @@ mod tests { use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter}; use crate::basic::{Encoding, Type as PhysicalType}; - use crate::column::page::{Page, PageReader}; + use crate::column::page::Page; use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; - use crate::errors::Result; - use crate::file::properties::WriterProperties; - use crate::file::serialized_reader::SerializedFileReader; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; use crate::util::test_common::make_pages; @@ -1072,7 +913,7 @@ mod tests { .unwrap(); let column_desc = schema.column(0); - let page_iterator = EmptyPageIterator::new(schema); + let page_iterator = test_util::EmptyPageIterator::new(schema); let mut array_reader = PrimitiveArrayReader::::new( Box::new(page_iterator), @@ -1690,81 +1531,6 @@ mod tests { ); } - /// Array reader for test. - struct InMemoryArrayReader { - data_type: ArrowType, - array: ArrayRef, - def_levels: Option>, - rep_levels: Option>, - } - - impl InMemoryArrayReader { - pub fn new( - data_type: ArrowType, - array: ArrayRef, - def_levels: Option>, - rep_levels: Option>, - ) -> Self { - Self { - data_type, - array, - def_levels, - rep_levels, - } - } - } - - impl ArrayReader for InMemoryArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn next_batch(&mut self, _batch_size: usize) -> Result { - Ok(self.array.clone()) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels.as_deref() - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels.as_deref() - } - } - - /// Iterator for testing reading empty columns - struct EmptyPageIterator { - schema: SchemaDescPtr, - } - - impl EmptyPageIterator { - fn new(schema: SchemaDescPtr) -> Self { - EmptyPageIterator { schema } - } - } - - impl Iterator for EmptyPageIterator { - type Item = Result>; - - fn next(&mut self) -> Option { - None - } - } - - impl PageIterator for EmptyPageIterator { - fn schema(&mut self) -> Result { - Ok(self.schema.clone()) - } - - fn column_schema(&mut self) -> Result { - Ok(self.schema.column(0)) - } - } - #[test] fn test_struct_array_reader() { let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5])); @@ -1814,180 +1580,4 @@ mod tests { struct_array_reader.get_rep_levels() ); } - - #[test] - fn test_list_array_reader() { - // [[1, null, 2], null, [3, 4]] - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), - ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))), - ArrowType::Int32, - 1, - 1, - 0, - 1, - ); - - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch.as_any().downcast_ref::().unwrap(); - - assert_eq!(3, list_array.len()); - // This passes as I expect - assert_eq!(1, list_array.null_count()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) - ); - - assert!(list_array.is_null(1)); - - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); - } - - #[test] - fn test_large_list_array_reader() { - // [[1, null, 2], null, [3, 4]] - let array = Arc::new(PrimitiveArray::::from(vec![ - Some(1), - None, - Some(2), - None, - Some(3), - Some(4), - ])); - let item_array_reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![3, 2, 3, 0, 3, 3]), - Some(vec![0, 1, 1, 0, 0, 1]), - ); - - let mut list_array_reader = ListArrayReader::::new( - Box::new(item_array_reader), - ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))), - ArrowType::Int32, - 1, - 1, - 0, - 1, - ); - - let next_batch = list_array_reader.next_batch(1024).unwrap(); - let list_array = next_batch - .as_any() - .downcast_ref::() - .unwrap(); - - assert_eq!(3, list_array.len()); - - assert_eq!( - list_array - .value(0) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) - ); - - assert!(list_array.is_null(1)); - - assert_eq!( - list_array - .value(2) - .as_any() - .downcast_ref::>() - .unwrap(), - &PrimitiveArray::::from(vec![Some(3), Some(4)]) - ); - } - - #[test] - fn test_nested_lists() { - // Construct column schema - let message_type = " - message table { - REPEATED group table_info { - REQUIRED BYTE_ARRAY name; - REPEATED group cols { - REQUIRED BYTE_ARRAY name; - REQUIRED INT32 type; - OPTIONAL INT32 length; - } - REPEATED group tags { - REQUIRED BYTE_ARRAY name; - REQUIRED INT32 type; - OPTIONAL INT32 length; - } - } - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap(); - - let file = tempfile::tempfile().unwrap(); - let props = WriterProperties::builder() - .set_max_row_group_size(200) - .build(); - - let mut writer = ArrowWriter::try_new( - file.try_clone().unwrap(), - Arc::new(arrow_schema), - Some(props), - ) - .unwrap(); - writer.close().unwrap(); - - let file_reader: Arc = - Arc::new(SerializedFileReader::new(file).unwrap()); - - let file_metadata = file_reader.metadata().file_metadata(); - let arrow_schema = parquet_to_arrow_schema( - file_metadata.schema_descr(), - file_metadata.key_value_metadata(), - ) - .unwrap(); - - let mut array_reader = build_array_reader( - file_reader.metadata().file_metadata().schema_descr_ptr(), - Arc::new(arrow_schema.clone()), - vec![0usize].into_iter(), - Box::new(file_reader), - ) - .unwrap(); - - let batch = array_reader.next_batch(100).unwrap(); - assert_eq!(batch.data_type(), &Struct(arrow_schema.fields().clone())); - assert_eq!(batch.len(), 0); - } } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs new file mode 100644 index 00000000000..31c52c9c5bf --- /dev/null +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::ParquetError::ArrowError; +use crate::errors::Result; +use arrow::array::{ + new_empty_array, ArrayData, ArrayRef, GenericListArray, + OffsetSizeTrait, UInt32Array, +}; +use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::datatypes::DataType as ArrowType; +use arrow::datatypes::ToByteSlice; +use arrow::util::bit_util; +use std::any::Any; +use std::marker::PhantomData; +use std::sync::Arc; + +/// Implementation of list array reader. +pub struct ListArrayReader { + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + list_def_level: i16, + list_rep_level: i16, + list_empty_def_level: i16, + list_null_def_level: i16, + def_level_buffer: Option, + rep_level_buffer: Option, + _marker: PhantomData, +} + +impl ListArrayReader { + /// Construct list array reader. + pub fn new( + item_reader: Box, + data_type: ArrowType, + item_type: ArrowType, + def_level: i16, + rep_level: i16, + list_null_def_level: i16, + list_empty_def_level: i16, + ) -> Self { + Self { + item_reader, + data_type, + item_type, + list_def_level: def_level, + list_rep_level: rep_level, + list_null_def_level, + list_empty_def_level, + def_level_buffer: None, + rep_level_buffer: None, + _marker: PhantomData, + } + } +} + +/// Implementation of ListArrayReader. Nested lists and lists of structs are not yet supported. +impl ArrayReader for ListArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a List. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + let next_batch_array = self.item_reader.next_batch(batch_size)?; + + if next_batch_array.len() == 0 { + return Ok(new_empty_array(&self.data_type)); + } + let def_levels = self + .item_reader + .get_def_levels() + .ok_or_else(|| ArrowError("item_reader def levels are None.".to_string()))?; + let rep_levels = self + .item_reader + .get_rep_levels() + .ok_or_else(|| ArrowError("item_reader rep levels are None.".to_string()))?; + + if !((def_levels.len() == rep_levels.len()) + && (rep_levels.len() == next_batch_array.len())) + { + return Err(ArrowError( + format!("Expected item_reader def_levels {} and rep_levels {} to be same length as batch {}", def_levels.len(), rep_levels.len(), next_batch_array.len()), + )); + } + + // List definitions can be encoded as 4 values: + // - n + 0: the list slot is null + // - n + 1: the list slot is not null, but is empty (i.e. []) + // - n + 2: the list slot is not null, but its child is empty (i.e. [ null ]) + // - n + 3: the list slot is not null, and its child is not empty + // Where n is the max definition level of the list's parent. + // If a Parquet schema's only leaf is the list, then n = 0. + + // If the list index is at empty definition, the child slot is null + let non_null_list_indices = + def_levels.iter().enumerate().filter_map(|(index, def)| { + (*def > self.list_empty_def_level).then(|| index as u32) + }); + let indices = UInt32Array::from_iter_values(non_null_list_indices); + let batch_values = + arrow::compute::take(&*next_batch_array.clone(), &indices, None)?; + + // first item in each list has rep_level = 0, subsequent items have rep_level = 1 + let mut offsets: Vec = Vec::new(); + let mut cur_offset = OffsetSize::zero(); + def_levels.iter().zip(rep_levels).for_each(|(d, r)| { + if *r == 0 || d == &self.list_empty_def_level { + offsets.push(cur_offset); + } + if d > &self.list_empty_def_level { + cur_offset += OffsetSize::one(); + } + }); + + offsets.push(cur_offset); + + let num_bytes = bit_util::ceil(offsets.len(), 8); + // TODO: A useful optimization is to use the null count to fill with + // 0 or null, to reduce individual bits set in a loop. + // To favour dense data, set every slot to true, then unset + let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); + let null_slice = null_buf.as_slice_mut(); + let mut list_index = 0; + for i in 0..rep_levels.len() { + // If the level is lower than empty, then the slot is null. + // When a list is non-nullable, its empty level = null level, + // so this automatically factors that in. + if rep_levels[i] == 0 && def_levels[i] < self.list_empty_def_level { + bit_util::unset_bit(null_slice, list_index); + } + if rep_levels[i] == 0 { + list_index += 1; + } + } + let value_offsets = Buffer::from(&offsets.to_byte_slice()); + + let list_data = ArrayData::builder(self.get_data_type().clone()) + .len(offsets.len() - 1) + .add_buffer(value_offsets) + .add_child_data(batch_values.data().clone()) + .null_bit_buffer(null_buf.into()) + .offset(next_batch_array.offset()); + + let list_data = unsafe { list_data.build_unchecked() }; + + let result_array = GenericListArray::::from(list_data); + Ok(Arc::new(result_array)) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_level_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::build_array_reader; + use crate::arrow::array_reader::list_array::ListArrayReader; + use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::{parquet_to_arrow_schema, ArrowWriter}; + use crate::file::properties::WriterProperties; + use crate::file::reader::{FileReader, SerializedFileReader}; + use crate::schema::parser::parse_message_type; + use crate::schema::types::SchemaDescriptor; + use arrow::array::{Array, LargeListArray, ListArray, PrimitiveArray}; + use arrow::datatypes::{Field, Int32Type as ArrowInt32}; + use std::sync::Arc; + + #[test] + fn test_list_array_reader() { + // [[1, null, 2], null, [3, 4]] + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + ])); + + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + ArrowType::List(Box::new(Field::new("item", ArrowType::Int32, true))), + ArrowType::Int32, + 1, + 1, + 0, + 1, + ); + + let next_batch = list_array_reader.next_batch(1024).unwrap(); + let list_array = next_batch.as_any().downcast_ref::().unwrap(); + + assert_eq!(3, list_array.len()); + // This passes as I expect + assert_eq!(1, list_array.null_count()); + + assert_eq!( + list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + ); + + assert!(list_array.is_null(1)); + + assert_eq!( + list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(3), Some(4)]) + ); + } + + #[test] + fn test_large_list_array_reader() { + // [[1, null, 2], null, [3, 4]] + let array = Arc::new(PrimitiveArray::::from(vec![ + Some(1), + None, + Some(2), + None, + Some(3), + Some(4), + ])); + let item_array_reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![3, 2, 3, 0, 3, 3]), + Some(vec![0, 1, 1, 0, 0, 1]), + ); + + let mut list_array_reader = ListArrayReader::::new( + Box::new(item_array_reader), + ArrowType::LargeList(Box::new(Field::new("item", ArrowType::Int32, true))), + ArrowType::Int32, + 1, + 1, + 0, + 1, + ); + + let next_batch = list_array_reader.next_batch(1024).unwrap(); + let list_array = next_batch + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(3, list_array.len()); + + assert_eq!( + list_array + .value(0) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(1), None, Some(2)]) + ); + + assert!(list_array.is_null(1)); + + assert_eq!( + list_array + .value(2) + .as_any() + .downcast_ref::>() + .unwrap(), + &PrimitiveArray::::from(vec![Some(3), Some(4)]) + ); + } + + #[test] + fn test_nested_lists() { + // Construct column schema + let message_type = " + message table { + REPEATED group table_info { + REQUIRED BYTE_ARRAY name; + REPEATED group cols { + REQUIRED BYTE_ARRAY name; + REQUIRED INT32 type; + OPTIONAL INT32 length; + } + REPEATED group tags { + REQUIRED BYTE_ARRAY name; + REQUIRED INT32 type; + OPTIONAL INT32 length; + } + } + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap(); + + let file = tempfile::tempfile().unwrap(); + let props = WriterProperties::builder() + .set_max_row_group_size(200) + .build(); + + let mut writer = ArrowWriter::try_new( + file.try_clone().unwrap(), + Arc::new(arrow_schema), + Some(props), + ) + .unwrap(); + writer.close().unwrap(); + + let file_reader: Arc = + Arc::new(SerializedFileReader::new(file).unwrap()); + + let file_metadata = file_reader.metadata().file_metadata(); + let arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .unwrap(); + + let mut array_reader = build_array_reader( + file_reader.metadata().file_metadata().schema_descr_ptr(), + Arc::new(arrow_schema.clone()), + vec![0usize].into_iter(), + Box::new(file_reader), + ) + .unwrap(); + + let batch = array_reader.next_batch(100).unwrap(); + assert_eq!( + batch.data_type(), + &ArrowType::Struct(arrow_schema.fields().clone()) + ); + assert_eq!(batch.len(), 0); + } +} diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index b04a597c976..f212d05b071 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -15,12 +15,20 @@ // specific language governing permissions and limitations // under the License. +use arrow::array::ArrayRef; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; use std::sync::Arc; +use crate::arrow::array_reader::ArrayReader; use crate::basic::{ConvertedType, Encoding, Type as PhysicalType}; +use crate::column::page::{PageIterator, PageReader}; use crate::data_type::{ByteArray, ByteArrayType}; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; -use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type}; +use crate::errors::Result; +use crate::schema::types::{ + ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, +}; use crate::util::memory::{ByteBufferPtr, MemTracker}; /// Returns a descriptor for a UTF-8 column @@ -87,3 +95,78 @@ pub fn byte_array_all_encodings( (pages, encoded_dictionary) } + +/// Array reader for test. +pub struct InMemoryArrayReader { + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, +} + +impl InMemoryArrayReader { + pub fn new( + data_type: ArrowType, + array: ArrayRef, + def_levels: Option>, + rep_levels: Option>, + ) -> Self { + Self { + data_type, + array, + def_levels, + rep_levels, + } + } +} + +impl ArrayReader for InMemoryArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, _batch_size: usize) -> Result { + Ok(self.array.clone()) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels.as_deref() + } +} + +/// Iterator for testing reading empty columns +pub struct EmptyPageIterator { + schema: SchemaDescPtr, +} + +impl EmptyPageIterator { + pub fn new(schema: SchemaDescPtr) -> Self { + EmptyPageIterator { schema } + } +} + +impl Iterator for EmptyPageIterator { + type Item = Result>; + + fn next(&mut self) -> Option { + None + } +} + +impl PageIterator for EmptyPageIterator { + fn schema(&mut self) -> Result { + Ok(self.schema.clone()) + } + + fn column_schema(&mut self) -> Result { + Ok(self.schema.column(0)) + } +}