From 7cd47ded5ebe01b8086ff68bcb442166a9f6f100 Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Wed, 10 Apr 2024 16:12:53 +0200 Subject: [PATCH 1/4] Change `UnionArray` constructors --- arrow-array/src/array/union_array.rs | 332 +++++++++++------------ arrow-array/src/builder/union_builder.rs | 78 +++--- arrow-cast/src/pretty.rs | 18 +- arrow-flight/src/encode.rs | 101 +++---- arrow-integration-test/src/lib.rs | 23 +- arrow-ipc/src/reader.rs | 17 +- arrow-ipc/src/writer.rs | 9 +- arrow-schema/src/fields.rs | 2 - arrow-select/src/take.rs | 30 +- arrow/tests/array_transform.rs | 42 +-- 10 files changed, 317 insertions(+), 335 deletions(-) diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index 22d4cf90a092..7cf557622f81 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -17,13 +17,13 @@ use crate::{make_array, Array, ArrayRef}; use arrow_buffer::buffer::NullBuffer; -use arrow_buffer::{Buffer, ScalarBuffer}; +use arrow_buffer::ScalarBuffer; use arrow_data::{ArrayData, ArrayDataBuilder}; -use arrow_schema::{ArrowError, DataType, Field, UnionFields, UnionMode}; +use arrow_schema::{ArrowError, DataType, UnionFields, UnionMode}; /// Contains the `UnionArray` type. /// use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; /// An array of [values of varying types](https://arrow.apache.org/docs/format/Columnar.html#union-layout) @@ -43,25 +43,30 @@ use std::sync::Arc; /// # Examples /// ## Create a dense UnionArray `[1, 3.2, 34]` /// ``` -/// use arrow_buffer::Buffer; +/// use arrow_buffer::ScalarBuffer; /// use arrow_schema::*; /// use std::sync::Arc; /// use arrow_array::{Array, Int32Array, Float64Array, UnionArray}; /// /// let int_array = Int32Array::from(vec![1, 34]); /// let float_array = Float64Array::from(vec![3.2]); -/// let type_id_buffer = Buffer::from_slice_ref(&[0_i8, 1, 0]); -/// let value_offsets_buffer = Buffer::from_slice_ref(&[0_i32, 0, 1]); +/// let type_ids = [0, 1, 0].into_iter().collect::>(); +/// let offsets = [0, 0, 1].into_iter().collect::>(); /// -/// let children: Vec<(Field, Arc)> = vec![ -/// (Field::new("A", DataType::Int32, false), Arc::new(int_array)), -/// (Field::new("B", DataType::Float64, false), Arc::new(float_array)), +/// let union_fields = [ +/// (0, Arc::new(Field::new("A", DataType::Int32, false))), +/// (1, Arc::new(Field::new("B", DataType::Float64, false))), +/// ].into_iter().collect::(); +/// +/// let children = vec![ +/// Arc::new(int_array) as Arc, +/// Arc::new(float_array), /// ]; /// /// let array = UnionArray::try_new( -/// &vec![0, 1], -/// type_id_buffer, -/// Some(value_offsets_buffer), +/// union_fields, +/// type_ids, +/// Some(offsets), /// children, /// ).unwrap(); /// @@ -77,23 +82,28 @@ use std::sync::Arc; /// /// ## Create a sparse UnionArray `[1, 3.2, 34]` /// ``` -/// use arrow_buffer::Buffer; +/// use arrow_buffer::ScalarBuffer; /// use arrow_schema::*; /// use std::sync::Arc; /// use arrow_array::{Array, Int32Array, Float64Array, UnionArray}; /// /// let int_array = Int32Array::from(vec![Some(1), None, Some(34)]); /// let float_array = Float64Array::from(vec![None, Some(3.2), None]); -/// let type_id_buffer = Buffer::from_slice_ref(&[0_i8, 1, 0]); +/// let type_ids = [0_i8, 1, 0].into_iter().collect::>(); +/// +/// let union_fields = [ +/// (0, Arc::new(Field::new("A", DataType::Int32, false))), +/// (1, Arc::new(Field::new("B", DataType::Float64, false))), +/// ].into_iter().collect::(); /// -/// let children: Vec<(Field, Arc)> = vec![ -/// (Field::new("A", DataType::Int32, false), Arc::new(int_array)), -/// (Field::new("B", DataType::Float64, false), Arc::new(float_array)), +/// let children = vec![ +/// Arc::new(int_array) as Arc, +/// Arc::new(float_array), /// ]; /// /// let array = UnionArray::try_new( -/// &vec![0, 1], -/// type_id_buffer, +/// union_fields, +/// type_ids, /// None, /// children, /// ).unwrap(); @@ -125,102 +135,119 @@ impl UnionArray { /// /// # Safety /// - /// The `type_ids` `Buffer` should contain `i8` values. These values should be greater than - /// zero and must be less than the number of children provided in `child_arrays`. These values - /// are used to index into the `child_arrays`. + /// The `type_ids` values should be positive and must match one of the type ids of the fields provided in `fields`. + /// These values are used to index into the `child_arrays`. /// - /// The `value_offsets` `Buffer` is only provided in the case of a dense union, sparse unions - /// should use `None`. If provided the `value_offsets` `Buffer` should contain `i32` values. - /// The values in this array should be greater than zero and must be less than the length of the - /// overall array. + /// The `offsets` is provided in the case of a dense union, sparse unions should use `None`. + /// If provided the `offsets` values should be positive and must be less than the length of the + /// corresponding array. /// /// In both cases above we use signed integer types to maintain compatibility with other /// Arrow implementations. - /// - /// In both of the cases above we are accepting `Buffer`'s which are assumed to be representing - /// `i8` and `i32` values respectively. `Buffer` objects are untyped and no attempt is made - /// to ensure that the data provided is valid. pub unsafe fn new_unchecked( - field_type_ids: &[i8], - type_ids: Buffer, - value_offsets: Option, - child_arrays: Vec<(Field, ArrayRef)>, + fields: UnionFields, + type_ids: ScalarBuffer, + offsets: Option>, + children: Vec, ) -> Self { - let (fields, field_values): (Vec<_>, Vec<_>) = child_arrays.into_iter().unzip(); - let len = type_ids.len(); - - let mode = if value_offsets.is_some() { + let mode = if offsets.is_some() { UnionMode::Dense } else { UnionMode::Sparse }; - let builder = ArrayData::builder(DataType::Union( - UnionFields::new(field_type_ids.iter().copied(), fields), - mode, - )) - .add_buffer(type_ids) - .child_data(field_values.into_iter().map(|a| a.into_data()).collect()) - .len(len); + let len = type_ids.len(); + let builder = ArrayData::builder(DataType::Union(fields, mode)) + .add_buffer(type_ids.into_inner()) + .child_data(children.into_iter().map(Array::into_data).collect()) + .len(len); - let data = match value_offsets { - Some(b) => builder.add_buffer(b).build_unchecked(), + let data = match offsets { + Some(offsets) => builder.add_buffer(offsets.into_inner()).build_unchecked(), None => builder.build_unchecked(), }; Self::from(data) } /// Attempts to create a new `UnionArray`, validating the inputs provided. + /// + /// The order of child arrays child array order must match the fields order pub fn try_new( - field_type_ids: &[i8], - type_ids: Buffer, - value_offsets: Option, - child_arrays: Vec<(Field, ArrayRef)>, + fields: UnionFields, + type_ids: ScalarBuffer, + offsets: Option>, + children: Vec, ) -> Result { - if let Some(b) = &value_offsets { - if ((type_ids.len()) * 4) != b.len() { - return Err(ArrowError::InvalidArgumentError( - "Type Ids and Offsets represent a different number of array slots.".to_string(), - )); - } + // There must be a child array for every field. + if fields.len() != children.len() { + return Err(ArrowError::InvalidArgumentError( + "Union fields length must match child arrays length".to_string(), + )); } - // Check the type_ids - let type_id_slice: &[i8] = type_ids.typed_data(); - let invalid_type_ids = type_id_slice + // Specified field type ids must be positive. + let field_type_ids = fields .iter() - .filter(|i| *i < &0) - .collect::>(); - if !invalid_type_ids.is_empty() { - return Err(ArrowError::InvalidArgumentError(format!( - "Type Ids must be positive and cannot be greater than the number of \ - child arrays, found:\n{invalid_type_ids:?}" - ))); + .map(|(type_id, _)| { + if type_id >= 0 { + Ok(type_id) + } else { + Err(ArrowError::InvalidArgumentError( + "Type Ids must be positive".to_owned(), + )) + } + }) + .collect::, _>>()?; + + // There must be an offset value for every type id value. + if offsets + .as_ref() + .is_some_and(|offsets| offsets.len() != type_ids.len()) + { + return Err(ArrowError::InvalidArgumentError( + "Type Ids and Offsets lengths must match".to_string(), + )); } - // Check the value offsets if provided - if let Some(offset_buffer) = &value_offsets { - let max_len = type_ids.len() as i32; - let offsets_slice: &[i32] = offset_buffer.typed_data(); - let invalid_offsets = offsets_slice - .iter() - .filter(|i| *i < &0 || *i > &max_len) - .collect::>(); - if !invalid_offsets.is_empty() { - return Err(ArrowError::InvalidArgumentError(format!( - "Offsets must be positive and within the length of the Array, \ - found:\n{invalid_offsets:?}" - ))); - } + // Type id values must match one of the fields. + if type_ids + .iter() + .any(|type_id| !field_type_ids.contains(type_id)) + { + return Err(ArrowError::InvalidArgumentError( + "Type Ids values must match one of the field type ids".to_owned(), + )); } - // Unsafe Justification: arguments were validated above (and - // re-revalidated as part of data().validate() below) - let new_self = - unsafe { Self::new_unchecked(field_type_ids, type_ids, value_offsets, child_arrays) }; - new_self.to_data().validate()?; + // Create mapping from type id to array lengths. + let array_lens = fields + .iter() + .map(|(type_id, _)| type_id) + .zip( + children + .iter() + .map(Array::len) + .map(i32::try_from) + .map(Result::unwrap), + ) + .collect::>(); + + // Check the value offsets are in bounds. + if offsets.as_ref().is_some_and(|offsets| { + type_ids + .iter() + .zip(offsets.iter()) + .any(|(type_id, &offset)| offset < 0 || offset > array_lens[type_id]) + }) { + return Err(ArrowError::InvalidArgumentError( + "Offsets must be positive and within the length of the Array".to_owned(), + )); + } - Ok(new_self) + // Safety: + // - Arguments validated above. + let union_array = unsafe { Self::new_unchecked(fields, type_ids, offsets, children) }; + Ok(union_array) } /// Accesses the child array for `type_id`. @@ -336,14 +363,14 @@ impl UnionArray { /// let union_array = builder.build()?; /// /// // Deconstruct into parts - /// let (type_ids, offsets, field_type_ids, fields) = union_array.into_parts(); + /// let (union_fields, type_ids, offsets, children) = union_array.into_parts(); /// /// // Reconstruct from parts /// let union_array = UnionArray::try_new( - /// &field_type_ids, - /// type_ids.into_inner(), - /// offsets.map(ScalarBuffer::into_inner), - /// fields, + /// union_fields, + /// type_ids, + /// offsets, + /// children, /// ); /// # Ok(()) /// # } @@ -352,34 +379,24 @@ impl UnionArray { pub fn into_parts( self, ) -> ( + UnionFields, ScalarBuffer, Option>, - Vec, - Vec<(Field, ArrayRef)>, + Vec, ) { let Self { data_type, type_ids, offsets, - fields, + mut fields, } = self; match data_type { DataType::Union(union_fields, _) => { - let union_fields = union_fields.iter().collect::>(); - let (field_type_ids, fields) = fields - .into_iter() - .enumerate() - .flat_map(|(type_id, array_ref)| { - array_ref.map(|array_ref| { - let type_id = type_id as i8; - ( - type_id, - ((*Arc::clone(union_fields[&type_id])).clone(), array_ref), - ) - }) - }) - .unzip(); - (type_ids, offsets, field_type_ids, fields) + let children = union_fields + .iter() + .map(|(type_id, _)| fields[type_id as usize].take().unwrap()) + .collect(); + (union_fields, type_ids, offsets, children) } _ => unreachable!(), } @@ -576,7 +593,8 @@ mod tests { use crate::types::{Float32Type, Float64Type, Int32Type, Int64Type}; use crate::RecordBatch; use crate::{Float64Array, Int32Array, Int64Array, StringArray}; - use arrow_schema::Schema; + use arrow_buffer::Buffer; + use arrow_schema::{Field, Schema}; #[test] fn test_dense_i32() { @@ -809,30 +827,27 @@ mod tests { let int_array = Int32Array::from(vec![5, 6]); let float_array = Float64Array::from(vec![10.0]); - let type_ids = [1_i8, 0, 0, 2, 0, 1]; - let offsets = [0_i32, 0, 1, 0, 2, 1]; - - let type_id_buffer = Buffer::from_slice_ref(type_ids); - let value_offsets_buffer = Buffer::from_slice_ref(offsets); - - let children: Vec<(Field, Arc)> = vec![ - ( - Field::new("A", DataType::Utf8, false), - Arc::new(string_array), - ), - (Field::new("B", DataType::Int32, false), Arc::new(int_array)), - ( - Field::new("C", DataType::Float64, false), - Arc::new(float_array), - ), - ]; - let array = UnionArray::try_new( - &[0, 1, 2], - type_id_buffer, - Some(value_offsets_buffer), - children, - ) - .unwrap(); + let type_ids = [1, 0, 0, 2, 0, 1].into_iter().collect::>(); + let offsets = [0, 0, 1, 0, 2, 1] + .into_iter() + .collect::>(); + + let fields = [ + (0, Arc::new(Field::new("A", DataType::Utf8, false))), + (1, Arc::new(Field::new("B", DataType::Int32, false))), + (2, Arc::new(Field::new("C", DataType::Float64, false))), + ] + .into_iter() + .collect::(); + let children = [ + Arc::new(string_array) as Arc, + Arc::new(int_array), + Arc::new(float_array), + ] + .into_iter() + .collect(); + let array = + UnionArray::try_new(fields, type_ids.clone(), Some(offsets.clone()), children).unwrap(); // Check type ids assert_eq!(*array.type_ids(), type_ids); @@ -1277,29 +1292,22 @@ mod tests { let dense_union = builder.build().unwrap(); let field = [ - Field::new("a", DataType::Int32, false), - Field::new("b", DataType::Int8, false), + &Arc::new(Field::new("a", DataType::Int32, false)), + &Arc::new(Field::new("b", DataType::Int8, false)), ]; - let (type_ids, offsets, field_type_ids, fields) = dense_union.into_parts(); - assert_eq!(field_type_ids, [0, 1]); + let (union_fields, type_ids, offsets, children) = dense_union.into_parts(); assert_eq!( - field.to_vec(), - fields + union_fields .iter() - .cloned() - .map(|(field, _)| field) - .collect::>() + .map(|(_, field)| field) + .collect::>(), + field ); assert_eq!(type_ids, [0, 1, 0]); assert!(offsets.is_some()); assert_eq!(offsets.as_ref().unwrap(), &[0, 0, 1]); - let result = UnionArray::try_new( - &field_type_ids, - type_ids.into_inner(), - offsets.map(ScalarBuffer::into_inner), - fields, - ); + let result = UnionArray::try_new(union_fields, type_ids, offsets, children); assert!(result.is_ok()); assert_eq!(result.unwrap().len(), 3); @@ -1309,23 +1317,18 @@ mod tests { builder.append::("a", 3).unwrap(); let sparse_union = builder.build().unwrap(); - let (type_ids, offsets, field_type_ids, fields) = sparse_union.into_parts(); + let (union_fields, type_ids, offsets, children) = sparse_union.into_parts(); assert_eq!(type_ids, [0, 1, 0]); assert!(offsets.is_none()); - let result = UnionArray::try_new( - &field_type_ids, - type_ids.into_inner(), - offsets.map(ScalarBuffer::into_inner), - fields, - ); + let result = UnionArray::try_new(union_fields, type_ids, offsets, children); assert!(result.is_ok()); assert_eq!(result.unwrap().len(), 3); } #[test] fn into_parts_custom_type_ids() { - let mut set_field_type_ids: [i8; 3] = [8, 4, 9]; + let set_field_type_ids: [i8; 3] = [8, 4, 9]; let data_type = DataType::Union( UnionFields::new( set_field_type_ids, @@ -1354,15 +1357,12 @@ mod tests { .unwrap(); let array = UnionArray::from(data); - let (type_ids, offsets, field_type_ids, fields) = array.into_parts(); - set_field_type_ids.sort(); - assert_eq!(field_type_ids, set_field_type_ids); - let result = UnionArray::try_new( - &field_type_ids, - type_ids.into_inner(), - offsets.map(ScalarBuffer::into_inner), - fields, + let (union_fields, type_ids, offsets, children) = array.into_parts(); + assert_eq!( + type_ids.iter().collect::>(), + set_field_type_ids.iter().collect::>() ); + let result = UnionArray::try_new(union_fields, type_ids, offsets, children); assert!(result.is_ok()); let array = result.unwrap(); assert_eq!(array.len(), 7); diff --git a/arrow-array/src/builder/union_builder.rs b/arrow-array/src/builder/union_builder.rs index 4f88c9d41b9a..e6184f4ac6d2 100644 --- a/arrow-array/src/builder/union_builder.rs +++ b/arrow-array/src/builder/union_builder.rs @@ -23,7 +23,8 @@ use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_data::ArrayDataBuilder; use arrow_schema::{ArrowError, DataType, Field}; use std::any::Any; -use std::collections::HashMap; +use std::collections::BTreeMap; +use std::sync::Arc; /// `FieldData` is a helper struct to track the state of the fields in the `UnionBuilder`. #[derive(Debug)] @@ -142,7 +143,7 @@ pub struct UnionBuilder { /// The current number of slots in the array len: usize, /// Maps field names to `FieldData` instances which track the builders for that field - fields: HashMap, + fields: BTreeMap, /// Builder to keep track of type ids type_id_builder: Int8BufferBuilder, /// Builder to keep track of offsets (`None` for sparse unions) @@ -165,7 +166,7 @@ impl UnionBuilder { pub fn with_capacity_dense(capacity: usize) -> Self { Self { len: 0, - fields: HashMap::default(), + fields: Default::default(), type_id_builder: Int8BufferBuilder::new(capacity), value_offset_builder: Some(Int32BufferBuilder::new(capacity)), initial_capacity: capacity, @@ -176,7 +177,7 @@ impl UnionBuilder { pub fn with_capacity_sparse(capacity: usize) -> Self { Self { len: 0, - fields: HashMap::default(), + fields: Default::default(), type_id_builder: Int8BufferBuilder::new(capacity), value_offset_builder: None, initial_capacity: capacity, @@ -274,40 +275,39 @@ impl UnionBuilder { } /// Builds this builder creating a new `UnionArray`. - pub fn build(mut self) -> Result { - let type_id_buffer = self.type_id_builder.finish(); - let value_offsets_buffer = self.value_offset_builder.map(|mut b| b.finish()); - let mut children = Vec::new(); - for ( - name, - FieldData { - type_id, - data_type, - mut values_buffer, - slots, - null_buffer_builder: mut bitmap_builder, - }, - ) in self.fields.into_iter() - { - let buffer = values_buffer.finish(); - let arr_data_builder = ArrayDataBuilder::new(data_type.clone()) - .add_buffer(buffer) - .len(slots) - .nulls(bitmap_builder.finish()); - - let arr_data_ref = unsafe { arr_data_builder.build_unchecked() }; - let array_ref = make_array(arr_data_ref); - children.push((type_id, (Field::new(name, data_type, false), array_ref))) - } - - children.sort_by(|a, b| { - a.0.partial_cmp(&b.0) - .expect("This will never be None as type ids are always i8 values.") - }); - let children: Vec<_> = children.into_iter().map(|(_, b)| b).collect(); - - let type_ids: Vec = (0_i8..children.len() as i8).collect(); - - UnionArray::try_new(&type_ids, type_id_buffer, value_offsets_buffer, children) + pub fn build(self) -> Result { + let mut children = Vec::with_capacity(self.fields.len()); + let union_fields = self + .fields + .into_iter() + .map( + |( + name, + FieldData { + type_id, + data_type, + mut values_buffer, + slots, + mut null_buffer_builder, + }, + )| { + let array_ref = make_array(unsafe { + ArrayDataBuilder::new(data_type.clone()) + .add_buffer(values_buffer.finish()) + .len(slots) + .nulls(null_buffer_builder.finish()) + .build_unchecked() + }); + children.push(array_ref); + (type_id, Arc::new(Field::new(name, data_type, false))) + }, + ) + .collect(); + UnionArray::try_new( + union_fields, + self.type_id_builder.into(), + self.value_offset_builder.map(Into::into), + children, + ) } } diff --git a/arrow-cast/src/pretty.rs b/arrow-cast/src/pretty.rs index 550afa9f739d..2f0ecbab4ec6 100644 --- a/arrow-cast/src/pretty.rs +++ b/arrow-cast/src/pretty.rs @@ -137,7 +137,7 @@ mod tests { use arrow_array::builder::*; use arrow_array::types::*; use arrow_array::*; - use arrow_buffer::Buffer; + use arrow_buffer::ScalarBuffer; use arrow_schema::*; use half::f16; use std::fmt::Write; @@ -769,14 +769,18 @@ mod tests { // Can't use UnionBuilder with non-primitive types, so manually build outer UnionArray let a_array = Int32Array::from(vec![None, None, None, Some(1234), Some(23)]); - let type_ids = Buffer::from_slice_ref([1_i8, 1, 0, 0, 1]); + let type_ids = [1, 1, 0, 0, 1].into_iter().collect::>(); - let children: Vec<(Field, Arc)> = vec![ - (Field::new("a", DataType::Int32, true), Arc::new(a_array)), - (inner_field.clone(), Arc::new(inner)), - ]; + let children = vec![Arc::new(a_array) as Arc, Arc::new(inner)]; + + let union_fields = [ + (0, Arc::new(Field::new("a", DataType::Int32, true))), + (1, Arc::new(inner_field.clone())), + ] + .into_iter() + .collect(); - let outer = UnionArray::try_new(&[0, 1], type_ids, None, children).unwrap(); + let outer = UnionArray::try_new(union_fields, type_ids, None, children).unwrap(); let schema = Schema::new(vec![Field::new_union( "Teamsters", diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs index 7604f3cd4d62..f59c29e68173 100644 --- a/arrow-flight/src/encode.rs +++ b/arrow-flight/src/encode.rs @@ -597,20 +597,17 @@ fn hydrate_dictionary(array: &ArrayRef, data_type: &DataType) -> Result { let union_arr = array.as_any().downcast_ref::().unwrap(); - let (type_ids, fields): (Vec, Vec<&FieldRef>) = fields.iter().unzip(); - Arc::new(UnionArray::try_new( - &type_ids, - union_arr.type_ids().inner().clone(), + fields.clone(), + union_arr.type_ids().clone(), None, fields .iter() - .enumerate() - .map(|(col, field)| { - Ok(( - field.as_ref().clone(), - arrow_cast::cast(union_arr.child(col as i8), field.data_type())?, - )) + .map(|(type_id, field)| { + Ok(arrow_cast::cast( + union_arr.child(type_id), + field.data_type(), + )?) }) .collect::>>()?, )?) @@ -625,10 +622,10 @@ mod tests { use arrow_array::builder::StringDictionaryBuilder; use arrow_array::*; use arrow_array::{cast::downcast_array, types::*}; - use arrow_buffer::Buffer; + use arrow_buffer::ScalarBuffer; use arrow_cast::pretty::pretty_format_batches; use arrow_ipc::MetadataVersion; - use arrow_schema::UnionMode; + use arrow_schema::{UnionFields, UnionMode}; use std::collections::HashMap; use crate::decode::{DecodedPayload, FlightDataDecoder}; @@ -849,16 +846,23 @@ mod tests { true, )]; - let type_ids = vec![0, 1, 2]; - let union_fields = vec![ - Field::new_list( - "dict_list", - Field::new_dictionary("item", DataType::UInt16, DataType::Utf8, true), - true, + let union_fields = [ + ( + 0, + Arc::new(Field::new_list( + "dict_list", + Field::new_dictionary("item", DataType::UInt16, DataType::Utf8, true), + true, + )), ), - Field::new_struct("struct", struct_fields.clone(), true), - Field::new("string", DataType::Utf8, true), - ]; + ( + 1, + Arc::new(Field::new_struct("struct", struct_fields.clone(), true)), + ), + (2, Arc::new(Field::new("string", DataType::Utf8, true))), + ] + .into_iter() + .collect::(); let struct_fields = vec![Field::new_list( "dict_list", @@ -872,21 +876,15 @@ mod tests { let arr1 = builder.finish(); - let type_id_buffer = Buffer::from_slice_ref([0_i8]); + let type_id_buffer = [0].into_iter().collect::>(); let arr1 = UnionArray::try_new( - &type_ids, + union_fields.clone(), type_id_buffer, None, vec![ - (union_fields[0].clone(), Arc::new(arr1)), - ( - union_fields[1].clone(), - new_null_array(union_fields[1].data_type(), 1), - ), - ( - union_fields[2].clone(), - new_null_array(union_fields[2].data_type(), 1), - ), + Arc::new(arr1) as Arc, + new_null_array(union_fields.iter().nth(1).unwrap().1.data_type(), 1), + new_null_array(union_fields.iter().nth(2).unwrap().1.data_type(), 1), ], ) .unwrap(); @@ -896,47 +894,36 @@ mod tests { let arr2 = Arc::new(builder.finish()); let arr2 = StructArray::new(struct_fields.clone().into(), vec![arr2], None); - let type_id_buffer = Buffer::from_slice_ref([1_i8]); + let type_id_buffer = [1].into_iter().collect::>(); let arr2 = UnionArray::try_new( - &type_ids, + union_fields.clone(), type_id_buffer, None, vec![ - ( - union_fields[0].clone(), - new_null_array(union_fields[0].data_type(), 1), - ), - (union_fields[1].clone(), Arc::new(arr2)), - ( - union_fields[2].clone(), - new_null_array(union_fields[2].data_type(), 1), - ), + new_null_array(union_fields.iter().next().unwrap().1.data_type(), 1), + Arc::new(arr2), + new_null_array(union_fields.iter().nth(2).unwrap().1.data_type(), 1), ], ) .unwrap(); - let type_id_buffer = Buffer::from_slice_ref([2_i8]); + let type_id_buffer = [2].into_iter().collect::>(); let arr3 = UnionArray::try_new( - &type_ids, + union_fields.clone(), type_id_buffer, None, vec![ - ( - union_fields[0].clone(), - new_null_array(union_fields[0].data_type(), 1), - ), - ( - union_fields[1].clone(), - new_null_array(union_fields[1].data_type(), 1), - ), - ( - union_fields[2].clone(), - Arc::new(StringArray::from(vec!["e"])), - ), + new_null_array(union_fields.iter().next().unwrap().1.data_type(), 1), + new_null_array(union_fields.iter().nth(1).unwrap().1.data_type(), 1), + Arc::new(StringArray::from(vec!["e"])), ], ) .unwrap(); + let (type_ids, union_fields): (Vec<_>, Vec<_>) = union_fields + .iter() + .map(|(type_id, field_ref)| (type_id, (*Arc::clone(field_ref)).clone())) + .unzip(); let schema = Arc::new(Schema::new(vec![Field::new_union( "union", type_ids.clone(), diff --git a/arrow-integration-test/src/lib.rs b/arrow-integration-test/src/lib.rs index d6e0dda51a81..30f0ccfbe12d 100644 --- a/arrow-integration-test/src/lib.rs +++ b/arrow-integration-test/src/lib.rs @@ -21,6 +21,7 @@ //! //! This is not a canonical format, but provides a human-readable way of verifying language implementations +use arrow_buffer::ScalarBuffer; use hex::decode; use num::BigInt; use num::Signed; @@ -835,26 +836,18 @@ pub fn array_from_json( )); }; - let offset: Option = json_col.offset.map(|offsets| { - let offsets: Vec = - offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect(); - Buffer::from(&offsets.to_byte_slice()) - }); + let offset: Option> = json_col + .offset + .map(|offsets| offsets.iter().map(|v| v.as_i64().unwrap() as i32).collect()); - let mut children: Vec<(Field, Arc)> = vec![]; + let mut children = Vec::with_capacity(fields.len()); for ((_, field), col) in fields.iter().zip(json_col.children.unwrap()) { let array = array_from_json(field, col, dictionaries)?; - children.push((field.as_ref().clone(), array)); + children.push(array); } - let field_type_ids = fields.iter().map(|(id, _)| id).collect::>(); - let array = UnionArray::try_new( - &field_type_ids, - Buffer::from(&type_ids.to_byte_slice()), - offset, - children, - ) - .unwrap(); + let array = + UnionArray::try_new(fields.clone(), type_ids.into(), offset, children).unwrap(); Ok(Arc::new(array)) } t => Err(ArrowError::JsonError(format!( diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 8eac17e20761..3c203a7f3654 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -31,7 +31,7 @@ use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_array::*; -use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer}; +use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer}; use arrow_data::ArrayData; use arrow_schema::*; @@ -214,26 +214,25 @@ fn create_array( reader.next_buffer()?; } - let type_ids: Buffer = reader.next_buffer()?[..len].into(); + let type_ids: ScalarBuffer = reader.next_buffer()?.slice_with_length(0, len).into(); let value_offsets = match mode { UnionMode::Dense => { - let buffer = reader.next_buffer()?; - Some(buffer[..len * 4].into()) + let offsets: ScalarBuffer = + reader.next_buffer()?.slice_with_length(0, len * 4).into(); + Some(offsets) } UnionMode::Sparse => None, }; let mut children = Vec::with_capacity(fields.len()); - let mut ids = Vec::with_capacity(fields.len()); - for (id, field) in fields.iter() { + for (_id, field) in fields.iter() { let child = create_array(reader, field, variadic_counts, require_alignment)?; - children.push((field.as_ref().clone(), child)); - ids.push(id); + children.push(child); } - let array = UnionArray::try_new(&ids, type_ids, value_offsets, children)?; + let array = UnionArray::try_new(fields.clone(), type_ids, value_offsets, children)?; Ok(Arc::new(array)) } Null => { diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 97136bd97c2f..ef08a6130e3a 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -1526,6 +1526,7 @@ mod tests { use arrow_array::builder::UnionBuilder; use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder}; use arrow_array::types::*; + use arrow_buffer::ScalarBuffer; use crate::convert::fb_to_schema; use crate::reader::*; @@ -1800,12 +1801,12 @@ mod tests { // Dict field with id 2 let dctfield = Field::new_dict("dict", array.data_type().clone(), false, 2, false); + let union_fields = [(0, Arc::new(dctfield))].into_iter().collect(); - let types = Buffer::from_slice_ref([0_i8, 0, 0]); - let offsets = Buffer::from_slice_ref([0_i32, 1, 2]); + let types = [0, 0, 0].into_iter().collect::>(); + let offsets = [0, 1, 2].into_iter().collect::>(); - let union = - UnionArray::try_new(&[0], types, Some(offsets), vec![(dctfield, array)]).unwrap(); + let union = UnionArray::try_new(union_fields, types, Some(offsets), vec![array]).unwrap(); let schema = Arc::new(Schema::new(vec![Field::new( "union", diff --git a/arrow-schema/src/fields.rs b/arrow-schema/src/fields.rs index 5a1a6c84c256..63aef18ddf9c 100644 --- a/arrow-schema/src/fields.rs +++ b/arrow-schema/src/fields.rs @@ -420,8 +420,6 @@ impl UnionFields { } /// Returns an iterator over the fields and type ids in this [`UnionFields`] - /// - /// Note: the iteration order is not guaranteed pub fn iter(&self) -> impl Iterator + '_ { self.0.iter().map(|(id, f)| (*id, f)) } diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index dc9e13040c8e..495295559679 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -229,18 +229,15 @@ fn take_impl( } } DataType::Union(fields, UnionMode::Sparse) => { - let mut field_type_ids = Vec::with_capacity(fields.len()); let mut children = Vec::with_capacity(fields.len()); let values = values.as_any().downcast_ref::().unwrap(); - let type_ids = take_native(values.type_ids(), indices).into_inner(); - for (type_id, field) in fields.iter() { + let type_ids = take_native(values.type_ids(), indices); + for (type_id, _field) in fields.iter() { let values = values.child(type_id); let values = take_impl(values, indices)?; - let field = (**field).clone(); - children.push((field, values)); - field_type_ids.push(type_id); + children.push(values); } - let array = UnionArray::try_new(field_type_ids.as_slice(), type_ids, None, children)?; + let array = UnionArray::try_new(fields.clone(), type_ids, None, children)?; Ok(Arc::new(array)) } t => unimplemented!("Take not supported for data type {:?}", t) @@ -2151,19 +2148,22 @@ mod tests { None, ]); let strings = StringArray::from(vec![Some("a"), None, Some("c"), None, Some("d")]); - let type_ids = Buffer::from_slice_ref(vec![1i8; 5]); + let type_ids = [1; 5].into_iter().collect::>(); - let children: Vec<(Field, Arc)> = vec![ + let union_fields = [ ( - Field::new("f1", structs.data_type().clone(), true), - Arc::new(structs), + 0, + Arc::new(Field::new("f1", structs.data_type().clone(), true)), ), ( - Field::new("f2", strings.data_type().clone(), true), - Arc::new(strings), + 1, + Arc::new(Field::new("f2", strings.data_type().clone(), true)), ), - ]; - let array = UnionArray::try_new(&[0, 1], type_ids, None, children).unwrap(); + ] + .into_iter() + .collect(); + let children = vec![Arc::new(structs) as Arc, Arc::new(strings)]; + let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap(); let indices = vec![0, 3, 1, 0, 2, 4]; let index = UInt32Array::from(indices.clone()); diff --git a/arrow/tests/array_transform.rs b/arrow/tests/array_transform.rs index 83d3003a0586..42e4da7c4b4e 100644 --- a/arrow/tests/array_transform.rs +++ b/arrow/tests/array_transform.rs @@ -23,10 +23,10 @@ use arrow::array::{ }; use arrow::datatypes::Int16Type; use arrow_array::StringViewArray; -use arrow_buffer::Buffer; +use arrow_buffer::{Buffer, ScalarBuffer}; use arrow_data::transform::MutableArrayData; use arrow_data::ArrayData; -use arrow_schema::{DataType, Field, Fields}; +use arrow_schema::{DataType, Field, Fields, UnionFields}; use std::sync::Arc; #[allow(unused)] @@ -482,17 +482,25 @@ fn test_union_dense() { Some(4), Some(5), ])); - let offsets = Buffer::from_slice_ref([0, 0, 1, 1, 2, 2, 3, 4i32]); - let type_ids = Buffer::from_slice_ref([42, 84, 42, 84, 84, 42, 84, 84i8]); + let offsets = [0, 0, 1, 1, 2, 2, 3, 4] + .into_iter() + .collect::>(); + let type_ids = [42, 84, 42, 84, 84, 42, 84, 84] + .into_iter() + .collect::>(); + + let union_fields = [ + (84, Arc::new(Field::new("int", DataType::Int32, false))), + (42, Arc::new(Field::new("string", DataType::Utf8, false))), + ] + .into_iter() + .collect::(); let array = UnionArray::try_new( - &[84, 42], + union_fields.clone(), type_ids, Some(offsets), - vec![ - (Field::new("int", DataType::Int32, false), ints), - (Field::new("string", DataType::Utf8, false), strings), - ], + vec![ints, strings], ) .unwrap() .into_data(); @@ -507,19 +515,11 @@ fn test_union_dense() { // Expected data let strings: ArrayRef = Arc::new(StringArray::from(vec![Some("doe")])); let ints: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4)])); - let offsets = Buffer::from_slice_ref([0, 0, 1i32]); - let type_ids = Buffer::from_slice_ref([84, 42, 84i8]); + let offsets = [0, 0, 1].into_iter().collect::>(); + let type_ids = [84, 42, 84].into_iter().collect::>(); - let expected = UnionArray::try_new( - &[84, 42], - type_ids, - Some(offsets), - vec![ - (Field::new("int", DataType::Int32, false), ints), - (Field::new("string", DataType::Utf8, false), strings), - ], - ) - .unwrap(); + let expected = + UnionArray::try_new(union_fields, type_ids, Some(offsets), vec![ints, strings]).unwrap(); assert_eq!(array.to_data(), expected.to_data()); } From 5b7708563729db07bb1446b6ed0c3840a79a8774 Mon Sep 17 00:00:00 2001 From: Matthijs Brobbel Date: Tue, 16 Apr 2024 23:25:54 +0200 Subject: [PATCH 2/4] Fix a comment --- arrow-array/src/array/union_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index 7cf557622f81..aaf5ce05034e 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -136,7 +136,7 @@ impl UnionArray { /// # Safety /// /// The `type_ids` values should be positive and must match one of the type ids of the fields provided in `fields`. - /// These values are used to index into the `child_arrays`. + /// These values are used to index into the `children` arrays. /// /// The `offsets` is provided in the case of a dense union, sparse unions should use `None`. /// If provided the `offsets` values should be positive and must be less than the length of the From a69cbac4466eb2967ff20d6b117d30e891596f6f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 May 2024 08:14:34 +0100 Subject: [PATCH 3/4] Clippy and avoid using hashmaps --- arrow-array/src/array/union_array.rs | 140 +++++++++++++++++---------- 1 file changed, 89 insertions(+), 51 deletions(-) diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index aaf5ce05034e..31fe85850f17 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -23,7 +23,6 @@ use arrow_schema::{ArrowError, DataType, UnionFields, UnionMode}; /// Contains the `UnionArray` type. /// use std::any::Any; -use std::collections::{HashMap, HashSet}; use std::sync::Arc; /// An array of [values of varying types](https://arrow.apache.org/docs/format/Columnar.html#union-layout) @@ -185,63 +184,43 @@ impl UnionArray { )); } - // Specified field type ids must be positive. - let field_type_ids = fields - .iter() - .map(|(type_id, _)| { - if type_id >= 0 { - Ok(type_id) - } else { - Err(ArrowError::InvalidArgumentError( - "Type Ids must be positive".to_owned(), - )) - } - }) - .collect::, _>>()?; - // There must be an offset value for every type id value. - if offsets - .as_ref() - .is_some_and(|offsets| offsets.len() != type_ids.len()) - { - return Err(ArrowError::InvalidArgumentError( - "Type Ids and Offsets lengths must match".to_string(), - )); + if let Some(offsets) = &offsets { + if offsets.len() != type_ids.len() { + return Err(ArrowError::InvalidArgumentError( + "Type Ids and Offsets lengths must match".to_string(), + )); + } } - // Type id values must match one of the fields. - if type_ids - .iter() - .any(|type_id| !field_type_ids.contains(type_id)) - { - return Err(ArrowError::InvalidArgumentError( - "Type Ids values must match one of the field type ids".to_owned(), - )); + // Create mapping from type id to array lengths. + let max_id = fields.iter().map(|(i, _)| i).max().unwrap_or_default() as usize; + let mut array_lens = vec![i32::MIN; max_id + 1]; + for (cd, (field_id, _)) in children.iter().zip(fields.iter()) { + array_lens[field_id as usize] = cd.len() as i32; } - // Create mapping from type id to array lengths. - let array_lens = fields - .iter() - .map(|(type_id, _)| type_id) - .zip( - children - .iter() - .map(Array::len) - .map(i32::try_from) - .map(Result::unwrap), - ) - .collect::>(); + // Type id values must match one of the fields. + for id in &type_ids { + match array_lens.get(*id as usize) { + Some(x) if *x != i32::MIN => {} + _ => { + return Err(ArrowError::InvalidArgumentError( + "Type Ids values must match one of the field type ids".to_owned(), + )) + } + } + } // Check the value offsets are in bounds. - if offsets.as_ref().is_some_and(|offsets| { - type_ids - .iter() - .zip(offsets.iter()) - .any(|(type_id, &offset)| offset < 0 || offset > array_lens[type_id]) - }) { - return Err(ArrowError::InvalidArgumentError( - "Offsets must be positive and within the length of the Array".to_owned(), - )); + if let Some(offsets) = &offsets { + let mut iter = type_ids.iter().zip(offsets.iter()); + if iter.any(|(type_id, &offset)| offset < 0 || offset >= array_lens[*type_id as usize]) + { + return Err(ArrowError::InvalidArgumentError( + "Offsets must be positive and within the length of the Array".to_owned(), + )); + } } // Safety: @@ -586,6 +565,7 @@ impl std::fmt::Debug for UnionArray { #[cfg(test)] mod tests { use super::*; + use std::collections::HashSet; use crate::array::Int8Type; use crate::builder::UnionBuilder; @@ -1367,4 +1347,62 @@ mod tests { let array = result.unwrap(); assert_eq!(array.len(), 7); } + + #[test] + fn test_invalid() { + let fields = UnionFields::new( + [3, 2], + [ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + ], + ); + let children = vec![ + Arc::new(StringArray::from_iter_values(["a", "b"])) as _, + Arc::new(StringArray::from_iter_values(["c", "d"])) as _, + ]; + + let type_ids = vec![3, 3, 2].into(); + UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap(); + + let type_ids = vec![1, 2].into(); + let err = UnionArray::try_new(fields.clone(), type_ids, None, children).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: Type Ids values must match one of the field type ids" + ); + + let children = vec![ + Arc::new(StringArray::from_iter_values(["a", "b"])) as _, + Arc::new(StringArray::from_iter_values(["c"])) as _, + ]; + let type_ids = ScalarBuffer::from(vec![3_i8, 3, 2]); + let offsets = Some(vec![0, 1, 0].into()); + UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children.clone()).unwrap(); + + let offsets = Some(vec![0, 1, 1].into()); + let err = UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children.clone()) + .unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Offsets must be positive and within the length of the Array" + ); + + let offsets = Some(vec![0, 1].into()); + let err = + UnionArray::try_new(fields.clone(), type_ids.clone(), offsets, children).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Type Ids and Offsets lengths must match" + ); + + let err = UnionArray::try_new(fields.clone(), type_ids, None, vec![]).unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Union fields length must match child arrays length" + ); + } } From 2064a3884cab7756d96f7adb5cb3521cef18f259 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 8 May 2024 08:15:58 +0100 Subject: [PATCH 4/4] Additional test --- arrow-array/src/array/union_array.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index 31fe85850f17..ea4853cd1528 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -1366,6 +1366,14 @@ mod tests { UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap(); let type_ids = vec![1, 2].into(); + let err = + UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: Type Ids values must match one of the field type ids" + ); + + let type_ids = vec![7, 2].into(); let err = UnionArray::try_new(fields.clone(), type_ids, None, children).unwrap_err(); assert_eq!( err.to_string(),