diff --git a/arrow/src/array/mod.rs b/arrow/src/array/mod.rs index 65cf30832e27..6a0b94a55515 100644 --- a/arrow/src/array/mod.rs +++ b/arrow/src/array/mod.rs @@ -256,7 +256,7 @@ pub type DurationMillisecondBuilder = PrimitiveBuilder; pub type DurationMicrosecondBuilder = PrimitiveBuilder; pub type DurationNanosecondBuilder = PrimitiveBuilder; -pub use self::transform::MutableArrayData; +pub use self::transform::{Capacities, MutableArrayData}; // --------------------- Array Iterator --------------------- diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs index 5611671568f5..3fae2205ad15 100644 --- a/arrow/src/array/transform/mod.rs +++ b/arrow/src/array/transform/mod.rs @@ -326,9 +326,9 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { }) } -fn preallocate_str_buffer( +fn preallocate_offset_and_binary_buffer( capacity: usize, - arrays: &[&ArrayData], + binary_size: usize, ) -> [MutableBuffer; 2] { // offsets let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::()); @@ -338,25 +338,37 @@ fn preallocate_str_buffer( } else { buffer.push(0i32) } - let str_values_size = arrays - .iter() - .map(|data| { - // get the length of the value buffer - let buf_len = data.buffers()[1].len(); - // find the offset of the buffer - // this returns a slice of offsets, starting from the offset of the array - // so we can take the first value - let offset = data.buffer::(0)[0]; - buf_len - offset.to_usize().unwrap() - }) - .sum::(); [ buffer, - MutableBuffer::new(str_values_size * mem::size_of::()), + MutableBuffer::new(binary_size * mem::size_of::()), ] } +/// Define capacities of child data or data buffers. +#[derive(Debug, Clone)] +pub enum Capacities { + /// Binary, Utf8 and LargeUtf8 data types + /// Define + /// * the capacity of the array offsets + /// * the capacity of the binary/ str buffer + Binary(usize, Option), + /// List and LargeList data types + /// Define + /// * the capacity of the array offsets + /// * the capacity of the child data + List(usize, Option>), + /// Struct type + /// * the capacity of the array + /// * the capacities of the fields + Struct(usize, Option>), + /// Dictionary type + /// * the capacity of the array/keys + /// * the capacity of the values + Dictionary(usize, Option>), + /// Don't preallocate inner buffers and rely on array growth strategy + Array(usize), +} impl<'a> MutableArrayData<'a> { /// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an /// [ArrayData] from multiple `arrays`. @@ -364,7 +376,21 @@ impl<'a> MutableArrayData<'a> { /// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls /// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls]. /// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used. - pub fn new(arrays: Vec<&'a ArrayData>, mut use_nulls: bool, capacity: usize) -> Self { + pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self { + Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity)) + } + + /// Similar to [MutableArray::new], but lets users define the preallocated capacities of the array. + /// See also [MutableArray::new] for more information on the arguments. + /// + /// # Panic + /// This function panics if the given `capacities` don't match the data type of `arrays`. Or when + /// a [Capacities] variant is not yet supported. + pub fn with_capacities( + arrays: Vec<&'a ArrayData>, + mut use_nulls: bool, + capacities: Capacities, + ) -> Self { let data_type = arrays[0].data_type(); use crate::datatypes::*; @@ -374,12 +400,24 @@ impl<'a> MutableArrayData<'a> { use_nulls = true; }; - // We can prevent reallocation by precomputing the needed size. - // This is faster and more memory efficient. - let [buffer1, buffer2] = match data_type { - DataType::LargeUtf8 => preallocate_str_buffer::(capacity, &arrays), - DataType::Utf8 => preallocate_str_buffer::(capacity, &arrays), - _ => new_buffers(data_type, capacity), + let mut array_capacity; + + let [buffer1, buffer2] = match (data_type, &capacities) { + (DataType::LargeUtf8, Capacities::Binary(capacity, Some(value_cap))) + | (DataType::LargeBinary, Capacities::Binary(capacity, Some(value_cap))) => { + array_capacity = *capacity; + preallocate_offset_and_binary_buffer::(*capacity, *value_cap) + } + (DataType::Utf8, Capacities::Binary(capacity, Some(value_cap))) + | (DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => { + array_capacity = *capacity; + preallocate_offset_and_binary_buffer::(*capacity, *value_cap) + } + (_, Capacities::Array(capacity)) => { + array_capacity = *capacity; + new_buffers(data_type, *capacity) + } + _ => panic!("Capacities: {:?} not yet supported", capacities), }; let child_data = match &data_type { @@ -412,20 +450,66 @@ impl<'a> MutableArrayData<'a> { .iter() .map(|array| &array.child_data()[0]) .collect::>(); - vec![MutableArrayData::new(childs, use_nulls, capacity)] + + let capacities = if let Capacities::List(capacity, ref child_capacities) = + capacities + { + array_capacity = capacity; + child_capacities + .clone() + .map(|c| *c) + .unwrap_or(Capacities::Array(array_capacity)) + } else { + Capacities::Array(array_capacity) + }; + + vec![MutableArrayData::with_capacities( + childs, use_nulls, capacities, + )] } // the dictionary type just appends keys and clones the values. DataType::Dictionary(_, _) => vec![], DataType::Float16 => unreachable!(), - DataType::Struct(fields) => (0..fields.len()) - .map(|i| { - let child_arrays = arrays - .iter() - .map(|array| &array.child_data()[i]) - .collect::>(); - MutableArrayData::new(child_arrays, use_nulls, capacity) - }) - .collect::>(), + DataType::Struct(fields) => match capacities { + Capacities::Struct(capacity, Some(ref child_capacities)) => { + array_capacity = capacity; + (0..fields.len()) + .zip(child_capacities) + .map(|(i, child_cap)| { + let child_arrays = arrays + .iter() + .map(|array| &array.child_data()[i]) + .collect::>(); + MutableArrayData::with_capacities( + child_arrays, + use_nulls, + child_cap.clone(), + ) + }) + .collect::>() + } + Capacities::Struct(capacity, None) => { + array_capacity = capacity; + (0..fields.len()) + .map(|i| { + let child_arrays = arrays + .iter() + .map(|array| &array.child_data()[i]) + .collect::>(); + MutableArrayData::new(child_arrays, use_nulls, capacity) + }) + .collect::>() + } + _ => (0..fields.len()) + .map(|i| { + let child_arrays = arrays + .iter() + .map(|array| &array.child_data()[i]) + .collect::>(); + MutableArrayData::new(child_arrays, use_nulls, array_capacity) + }) + .collect::>(), + }, _ => { todo!("Take and filter operations still not supported for this datatype") } @@ -436,6 +520,9 @@ impl<'a> MutableArrayData<'a> { 0 => unreachable!(), 1 => Some(arrays[0].child_data()[0].clone()), _ => { + if let Capacities::Dictionary(_, _) = capacities { + panic!("dictionary capacity not yet supported") + } // Concat dictionaries together let dictionaries: Vec<_> = arrays.iter().map(|array| &array.child_data()[0]).collect(); @@ -465,7 +552,7 @@ impl<'a> MutableArrayData<'a> { .map(|array| build_extend_null_bits(array, use_nulls)) .collect(); - let null_bytes = bit_util::ceil(capacity, 8); + let null_bytes = bit_util::ceil(array_capacity, 8); let null_buffer = MutableBuffer::from_len_zeroed(null_bytes); let extend_values = match &data_type { diff --git a/arrow/src/compute/kernels/concat.rs b/arrow/src/compute/kernels/concat.rs index 83140c8fd646..cc976a463ff4 100644 --- a/arrow/src/compute/kernels/concat.rs +++ b/arrow/src/compute/kernels/concat.rs @@ -31,8 +31,26 @@ //! ``` use crate::array::*; +use crate::datatypes::DataType; use crate::error::{ArrowError, Result}; +fn compute_str_values_length( + arrays: &[&ArrayData], +) -> usize { + arrays + .iter() + .map(|&data| { + // get the length of the value buffer + let buf_len = data.buffers()[1].len(); + // find the offset of the buffer + // this returns a slice of offsets, starting from the offset of the array + // so we can take the first value + let offset = data.buffer::(0)[0]; + buf_len - offset.to_usize().unwrap() + }) + .sum() +} + /// Concatenate multiple [Array] of the same type into a single [ArrayRef]. pub fn concat(arrays: &[&Array]) -> Result { if arrays.is_empty() { @@ -56,7 +74,25 @@ pub fn concat(arrays: &[&Array]) -> Result { let arrays = arrays.iter().map(|a| a.data()).collect::>(); - let mut mutable = MutableArrayData::new(arrays, false, capacity); + let mut mutable = match arrays[0].data_type() { + DataType::Utf8 => { + let str_values_size = compute_str_values_length::(&arrays); + MutableArrayData::with_capacities( + arrays, + false, + Capacities::Binary(capacity, Some(str_values_size)), + ) + } + DataType::LargeUtf8 => { + let str_values_size = compute_str_values_length::(&arrays); + MutableArrayData::with_capacities( + arrays, + false, + Capacities::Binary(capacity, Some(str_values_size)), + ) + } + _ => MutableArrayData::new(arrays, false, capacity), + }; for (i, len) in lengths.iter().enumerate() { mutable.extend(i, 0, *len)