Skip to content

Commit

Permalink
make sure that only concat preallocates buffers (#382)
Browse files Browse the repository at this point in the history
* MutableArrayData::with_capacities

* better pattern matching

* add binary capacities

* add list child data

* add struct capacities

* add panic for dictionary type

* change dictionary capacity enum variant
  • Loading branch information
ritchie46 authored and alamb committed Jun 8, 2021
1 parent 36511f7 commit d5668ff
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 35 deletions.
2 changes: 1 addition & 1 deletion arrow/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub type DurationMillisecondBuilder = PrimitiveBuilder<DurationMillisecondType>;
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;

pub use self::transform::MutableArrayData;
pub use self::transform::{Capacities, MutableArrayData};

// --------------------- Array Iterator ---------------------

Expand Down
153 changes: 120 additions & 33 deletions arrow/src/array/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls {
})
}

fn preallocate_str_buffer<Offset: StringOffsetSizeTrait>(
fn preallocate_offset_and_binary_buffer<Offset: StringOffsetSizeTrait>(
capacity: usize,
arrays: &[&ArrayData],
binary_size: usize,
) -> [MutableBuffer; 2] {
// offsets
let mut buffer = MutableBuffer::new((1 + capacity) * mem::size_of::<Offset>());
Expand All @@ -338,33 +338,59 @@ fn preallocate_str_buffer<Offset: StringOffsetSizeTrait>(
} else {
buffer.push(0i32)
}
let str_values_size = arrays
.iter()
.map(|data| {
// get the length of the value buffer
let buf_len = data.buffers()[1].len();
// find the offset of the buffer
// this returns a slice of offsets, starting from the offset of the array
// so we can take the first value
let offset = data.buffer::<Offset>(0)[0];
buf_len - offset.to_usize().unwrap()
})
.sum::<usize>();

[
buffer,
MutableBuffer::new(str_values_size * mem::size_of::<u8>()),
MutableBuffer::new(binary_size * mem::size_of::<u8>()),
]
}

/// Define capacities of child data or data buffers.
#[derive(Debug, Clone)]
pub enum Capacities {
/// Binary, Utf8 and LargeUtf8 data types
/// Define
/// * the capacity of the array offsets
/// * the capacity of the binary/ str buffer
Binary(usize, Option<usize>),
/// List and LargeList data types
/// Define
/// * the capacity of the array offsets
/// * the capacity of the child data
List(usize, Option<Box<Capacities>>),
/// Struct type
/// * the capacity of the array
/// * the capacities of the fields
Struct(usize, Option<Vec<Capacities>>),
/// Dictionary type
/// * the capacity of the array/keys
/// * the capacity of the values
Dictionary(usize, Option<Box<Capacities>>),
/// Don't preallocate inner buffers and rely on array growth strategy
Array(usize),
}
impl<'a> MutableArrayData<'a> {
/// returns a new [MutableArrayData] with capacity to `capacity` slots and specialized to create an
/// [ArrayData] from multiple `arrays`.
///
/// `use_nulls` is a flag used to optimize insertions. It should be `false` if the only source of nulls
/// are the arrays themselves and `true` if the user plans to call [MutableArrayData::extend_nulls].
/// In other words, if `use_nulls` is `false`, calling [MutableArrayData::extend_nulls] should not be used.
pub fn new(arrays: Vec<&'a ArrayData>, mut use_nulls: bool, capacity: usize) -> Self {
pub fn new(arrays: Vec<&'a ArrayData>, use_nulls: bool, capacity: usize) -> Self {
Self::with_capacities(arrays, use_nulls, Capacities::Array(capacity))
}

/// Similar to [MutableArray::new], but lets users define the preallocated capacities of the array.
/// See also [MutableArray::new] for more information on the arguments.
///
/// # Panic
/// This function panics if the given `capacities` don't match the data type of `arrays`. Or when
/// a [Capacities] variant is not yet supported.
pub fn with_capacities(
arrays: Vec<&'a ArrayData>,
mut use_nulls: bool,
capacities: Capacities,
) -> Self {
let data_type = arrays[0].data_type();
use crate::datatypes::*;

Expand All @@ -374,12 +400,24 @@ impl<'a> MutableArrayData<'a> {
use_nulls = true;
};

// We can prevent reallocation by precomputing the needed size.
// This is faster and more memory efficient.
let [buffer1, buffer2] = match data_type {
DataType::LargeUtf8 => preallocate_str_buffer::<i64>(capacity, &arrays),
DataType::Utf8 => preallocate_str_buffer::<i32>(capacity, &arrays),
_ => new_buffers(data_type, capacity),
let mut array_capacity;

let [buffer1, buffer2] = match (data_type, &capacities) {
(DataType::LargeUtf8, Capacities::Binary(capacity, Some(value_cap)))
| (DataType::LargeBinary, Capacities::Binary(capacity, Some(value_cap))) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i64>(*capacity, *value_cap)
}
(DataType::Utf8, Capacities::Binary(capacity, Some(value_cap)))
| (DataType::Binary, Capacities::Binary(capacity, Some(value_cap))) => {
array_capacity = *capacity;
preallocate_offset_and_binary_buffer::<i32>(*capacity, *value_cap)
}
(_, Capacities::Array(capacity)) => {
array_capacity = *capacity;
new_buffers(data_type, *capacity)
}
_ => panic!("Capacities: {:?} not yet supported", capacities),
};

let child_data = match &data_type {
Expand Down Expand Up @@ -412,20 +450,66 @@ impl<'a> MutableArrayData<'a> {
.iter()
.map(|array| &array.child_data()[0])
.collect::<Vec<_>>();
vec![MutableArrayData::new(childs, use_nulls, capacity)]

let capacities = if let Capacities::List(capacity, ref child_capacities) =
capacities
{
array_capacity = capacity;
child_capacities
.clone()
.map(|c| *c)
.unwrap_or(Capacities::Array(array_capacity))
} else {
Capacities::Array(array_capacity)
};

vec![MutableArrayData::with_capacities(
childs, use_nulls, capacities,
)]
}
// the dictionary type just appends keys and clones the values.
DataType::Dictionary(_, _) => vec![],
DataType::Float16 => unreachable!(),
DataType::Struct(fields) => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, capacity)
})
.collect::<Vec<_>>(),
DataType::Struct(fields) => match capacities {
Capacities::Struct(capacity, Some(ref child_capacities)) => {
array_capacity = capacity;
(0..fields.len())
.zip(child_capacities)
.map(|(i, child_cap)| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::with_capacities(
child_arrays,
use_nulls,
child_cap.clone(),
)
})
.collect::<Vec<_>>()
}
Capacities::Struct(capacity, None) => {
array_capacity = capacity;
(0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, capacity)
})
.collect::<Vec<_>>()
}
_ => (0..fields.len())
.map(|i| {
let child_arrays = arrays
.iter()
.map(|array| &array.child_data()[i])
.collect::<Vec<_>>();
MutableArrayData::new(child_arrays, use_nulls, array_capacity)
})
.collect::<Vec<_>>(),
},
_ => {
todo!("Take and filter operations still not supported for this datatype")
}
Expand All @@ -436,6 +520,9 @@ impl<'a> MutableArrayData<'a> {
0 => unreachable!(),
1 => Some(arrays[0].child_data()[0].clone()),
_ => {
if let Capacities::Dictionary(_, _) = capacities {
panic!("dictionary capacity not yet supported")
}
// Concat dictionaries together
let dictionaries: Vec<_> =
arrays.iter().map(|array| &array.child_data()[0]).collect();
Expand Down Expand Up @@ -465,7 +552,7 @@ impl<'a> MutableArrayData<'a> {
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();

let null_bytes = bit_util::ceil(capacity, 8);
let null_bytes = bit_util::ceil(array_capacity, 8);
let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);

let extend_values = match &data_type {
Expand Down
38 changes: 37 additions & 1 deletion arrow/src/compute/kernels/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,26 @@
//! ```

use crate::array::*;
use crate::datatypes::DataType;
use crate::error::{ArrowError, Result};

fn compute_str_values_length<Offset: StringOffsetSizeTrait>(
arrays: &[&ArrayData],
) -> usize {
arrays
.iter()
.map(|&data| {
// get the length of the value buffer
let buf_len = data.buffers()[1].len();
// find the offset of the buffer
// this returns a slice of offsets, starting from the offset of the array
// so we can take the first value
let offset = data.buffer::<Offset>(0)[0];
buf_len - offset.to_usize().unwrap()
})
.sum()
}

/// Concatenate multiple [Array] of the same type into a single [ArrayRef].
pub fn concat(arrays: &[&Array]) -> Result<ArrayRef> {
if arrays.is_empty() {
Expand All @@ -56,7 +74,25 @@ pub fn concat(arrays: &[&Array]) -> Result<ArrayRef> {

let arrays = arrays.iter().map(|a| a.data()).collect::<Vec<_>>();

let mut mutable = MutableArrayData::new(arrays, false, capacity);
let mut mutable = match arrays[0].data_type() {
DataType::Utf8 => {
let str_values_size = compute_str_values_length::<i32>(&arrays);
MutableArrayData::with_capacities(
arrays,
false,
Capacities::Binary(capacity, Some(str_values_size)),
)
}
DataType::LargeUtf8 => {
let str_values_size = compute_str_values_length::<i64>(&arrays);
MutableArrayData::with_capacities(
arrays,
false,
Capacities::Binary(capacity, Some(str_values_size)),
)
}
_ => MutableArrayData::new(arrays, false, capacity),
};

for (i, len) in lengths.iter().enumerate() {
mutable.extend(i, 0, *len)
Expand Down

0 comments on commit d5668ff

Please sign in to comment.