From a303fefe134213cb1afefcc8f62e4ff9e2e78ea8 Mon Sep 17 00:00:00 2001 From: gstvg <28798827+gstvg@users.noreply.github.com> Date: Wed, 2 Oct 2024 17:15:16 -0300 Subject: [PATCH] Implement UnionArray logical_nulls (#6303) * fix: implement UnionArray logical_nulls * fix: allow false positive clippy warning * fix: replace unstable functions * improve docs, perf, validate sparse, revert msrv * simplify benches * fine tune fast path threshold * update docs * fix fast path check, improve perf and docs * fix: remove rust 1.65 code * simplify mask_sparse_ * apply suggestions from review * Update arrow-array/src/array/union_array.rs Co-authored-by: wiedld --------- Co-authored-by: wiedld --- arrow-arith/src/boolean.rs | 60 +++ arrow-array/Cargo.toml | 4 + arrow-array/benches/union_array.rs | 84 +++ arrow-array/src/array/mod.rs | 5 +- arrow-array/src/array/union_array.rs | 759 ++++++++++++++++++++++++++- 5 files changed, 893 insertions(+), 19 deletions(-) create mode 100644 arrow-array/benches/union_array.rs diff --git a/arrow-arith/src/boolean.rs b/arrow-arith/src/boolean.rs index ea8e12abbe2c..d8c7cc19323e 100644 --- a/arrow-arith/src/boolean.rs +++ b/arrow-arith/src/boolean.rs @@ -352,6 +352,9 @@ pub fn is_not_null(input: &dyn Array) -> Result { #[cfg(test)] mod tests { + use arrow_buffer::ScalarBuffer; + use arrow_schema::{DataType, Field, UnionFields}; + use super::*; use std::sync::Arc; @@ -911,4 +914,61 @@ mod tests { assert_eq!(expected, res); assert!(res.nulls().is_none()); } + + #[test] + fn test_dense_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::from(vec![Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 1] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap(); + + let result = is_null(&array).unwrap(); + + let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + assert_eq!(expected, &result); + } + + #[test] + fn test_sparse_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]); + let float_array = Float64Array::from(vec![None, None, Some(3.2), None, None, None]); + let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = is_null(&array).unwrap(); + + let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + assert_eq!(expected, &result); + } + + fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + (2, Arc::new(Field::new("C", DataType::Utf8, true))), + ] + .into_iter() + .collect() + } } diff --git a/arrow-array/Cargo.toml b/arrow-array/Cargo.toml index d993d36b8d74..4c967df03afa 100644 --- a/arrow-array/Cargo.toml +++ b/arrow-array/Cargo.toml @@ -75,3 +75,7 @@ harness = false [[bench]] name = "decimal_overflow" harness = false + +[[bench]] +name = "union_array" +harness = false diff --git a/arrow-array/benches/union_array.rs b/arrow-array/benches/union_array.rs new file mode 100644 index 000000000000..c5b2ec0f7752 --- /dev/null +++ b/arrow-array/benches/union_array.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + iter::{repeat, repeat_with}, + sync::Arc, +}; + +use arrow_array::{Array, ArrayRef, Int32Array, UnionArray}; +use arrow_buffer::{NullBuffer, ScalarBuffer}; +use arrow_schema::{DataType, Field, UnionFields}; +use criterion::*; +use rand::{thread_rng, Rng}; + +fn array_with_nulls() -> ArrayRef { + let mut rng = thread_rng(); + + let values = ScalarBuffer::from_iter(repeat_with(|| rng.gen()).take(4096)); + + // nulls with at least one null and one valid + let nulls: NullBuffer = [true, false] + .into_iter() + .chain(repeat_with(|| rng.gen())) + .take(4096) + .collect(); + + Arc::new(Int32Array::new(values.clone(), Some(nulls))) +} + +fn array_without_nulls() -> ArrayRef { + let mut rng = thread_rng(); + + let values = ScalarBuffer::from_iter(repeat_with(|| rng.gen()).take(4096)); + + Arc::new(Int32Array::new(values.clone(), None)) +} + +fn criterion_benchmark(c: &mut Criterion) { + for with_nulls in 1..12 { + for without_nulls in [0, 1, 10] { + c.bench_function( + &format!("union logical nulls 4096 {with_nulls} children with nulls, {without_nulls} without nulls"), + |b| { + let type_ids = 0..with_nulls+without_nulls; + + let fields = UnionFields::new( + type_ids.clone(), + type_ids.clone().map(|i| Field::new(format!("f{i}"), DataType::Int32, true)), + ); + + let array = UnionArray::try_new( + fields, + type_ids.cycle().take(4096).collect(), + None, + repeat(array_with_nulls()) + .take(with_nulls as usize) + .chain(repeat(array_without_nulls()).take(without_nulls as usize)) + .collect(), + ) + .unwrap(); + + b.iter(|| black_box(array.logical_nulls())) + }, + ); + } + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 4b426e7eec80..296f5ae721b3 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -189,7 +189,7 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// /// The physical representation is efficient, but is sometimes non intuitive /// for certain array types such as those with nullable child arrays like - /// [`DictionaryArray::values`] or [`RunArray::values`], or without a + /// [`DictionaryArray::values`], [`RunArray::values`] or [`UnionArray`], or without a /// null buffer, such as [`NullArray`]. /// /// To determine if each element of such an array is "logically" null, @@ -209,6 +209,7 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// * [`DictionaryArray`] where [`DictionaryArray::values`] contains nulls /// * [`RunArray`] where [`RunArray::values`] contains nulls /// * [`NullArray`] where all indices are nulls + /// * [`UnionArray`] where the selected values contains nulls /// /// In these cases a logical [`NullBuffer`] will be computed, encoding the /// logical nullability of these arrays, beyond what is encoded in @@ -221,7 +222,7 @@ pub trait Array: std::fmt::Debug + Send + Sync { /// /// Note: For performance reasons, this method returns nullability solely as determined by the /// null buffer. This difference can lead to surprising results, for example, [`NullArray::is_null`] always - /// returns `false` as the array lacks a null buffer. Similarly [`DictionaryArray`] and [`RunArray`] may + /// returns `false` as the array lacks a null buffer. Similarly [`DictionaryArray`], [`RunArray`] and [`UnionArray`] may /// encode nullability in their children. See [`Self::logical_nulls`] for more information. /// /// # Example: diff --git a/arrow-array/src/array/union_array.rs b/arrow-array/src/array/union_array.rs index ea4853cd1528..1feef8c56755 100644 --- a/arrow-array/src/array/union_array.rs +++ b/arrow-array/src/array/union_array.rs @@ -14,15 +14,18 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +#![allow(clippy::enum_clike_unportable_variant)] use crate::{make_array, Array, ArrayRef}; +use arrow_buffer::bit_chunk_iterator::{BitChunkIterator, BitChunks}; use arrow_buffer::buffer::NullBuffer; -use arrow_buffer::ScalarBuffer; +use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::{ArrowError, DataType, UnionFields, UnionMode}; /// Contains the `UnionArray` type. /// use std::any::Any; +use std::collections::HashSet; use std::sync::Arc; /// An array of [values of varying types](https://arrow.apache.org/docs/format/Columnar.html#union-layout) @@ -184,13 +187,22 @@ impl UnionArray { )); } - // There must be an offset value for every type id value. if let Some(offsets) = &offsets { + // There must be an offset value for every type id value. if offsets.len() != type_ids.len() { return Err(ArrowError::InvalidArgumentError( "Type Ids and Offsets lengths must match".to_string(), )); } + } else { + // Sparse union child arrays must be equal in length to the length of the union + for child in &children { + if child.len() != type_ids.len() { + return Err(ArrowError::InvalidArgumentError( + "Sparse union child arrays must be equal in length to the length of the union".to_string(), + )); + } + } } // Create mapping from type id to array lengths. @@ -380,6 +392,267 @@ impl UnionArray { _ => unreachable!(), } } + + /// Computes the logical nulls for a sparse union, optimized for when there's a lot of fields without nulls + fn mask_sparse_skip_without_nulls(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer { + // Example logic for a union with 5 fields, a, b & c with nulls, d & e without nulls: + // let [a_nulls, b_nulls, c_nulls] = nulls; + // let [is_a, is_b, is_c] = masks; + // let is_d_or_e = !(is_a | is_b | is_c) + // let union_chunk_nulls = is_d_or_e | (is_a & a_nulls) | (is_b & b_nulls) | (is_c & c_nulls) + let fold = |(with_nulls_selected, union_nulls), (is_field, field_nulls)| { + ( + with_nulls_selected | is_field, + union_nulls | (is_field & field_nulls), + ) + }; + + self.mask_sparse_helper( + nulls, + |type_ids_chunk_array, nulls_masks_iters| { + let (with_nulls_selected, union_nulls) = nulls_masks_iters + .iter_mut() + .map(|(field_type_id, field_nulls)| { + let field_nulls = field_nulls.next().unwrap(); + let is_field = selection_mask(type_ids_chunk_array, *field_type_id); + + (is_field, field_nulls) + }) + .fold((0, 0), fold); + + // In the example above, this is the is_d_or_e = !(is_a | is_b) part + let without_nulls_selected = !with_nulls_selected; + + // if a field without nulls is selected, the value is always true(set bit) + // otherwise, the true/set bits have been computed above + without_nulls_selected | union_nulls + }, + |type_ids_remainder, bit_chunks| { + let (with_nulls_selected, union_nulls) = bit_chunks + .iter() + .map(|(field_type_id, field_bit_chunks)| { + let field_nulls = field_bit_chunks.remainder_bits(); + let is_field = selection_mask(type_ids_remainder, *field_type_id); + + (is_field, field_nulls) + }) + .fold((0, 0), fold); + + let without_nulls_selected = !with_nulls_selected; + + without_nulls_selected | union_nulls + }, + ) + } + + /// Computes the logical nulls for a sparse union, optimized for when there's a lot of fields fully null + fn mask_sparse_skip_fully_null(&self, mut nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer { + let fields = match self.data_type() { + DataType::Union(fields, _) => fields, + _ => unreachable!("Union array's data type is not a union!"), + }; + + let type_ids = fields.iter().map(|(id, _)| id).collect::>(); + let with_nulls = nulls.iter().map(|(id, _)| *id).collect::>(); + + let without_nulls_ids = type_ids + .difference(&with_nulls) + .copied() + .collect::>(); + + nulls.retain(|(_, nulls)| nulls.null_count() < nulls.len()); + + // Example logic for a union with 6 fields, a, b & c with nulls, d & e without nulls, and f fully_null: + // let [a_nulls, b_nulls, c_nulls] = nulls; + // let [is_a, is_b, is_c, is_d, is_e] = masks; + // let union_chunk_nulls = is_d | is_e | (is_a & a_nulls) | (is_b & b_nulls) | (is_c & c_nulls) + self.mask_sparse_helper( + nulls, + |type_ids_chunk_array, nulls_masks_iters| { + let union_nulls = nulls_masks_iters.iter_mut().fold( + 0, + |union_nulls, (field_type_id, nulls_iter)| { + let field_nulls = nulls_iter.next().unwrap(); + + if field_nulls == 0 { + union_nulls + } else { + let is_field = selection_mask(type_ids_chunk_array, *field_type_id); + + union_nulls | (is_field & field_nulls) + } + }, + ); + + // Given the example above, this is the is_d_or_e = (is_d | is_e) part + let without_nulls_selected = + without_nulls_selected(type_ids_chunk_array, &without_nulls_ids); + + // if a field without nulls is selected, the value is always true(set bit) + // otherwise, the true/set bits have been computed above + union_nulls | without_nulls_selected + }, + |type_ids_remainder, bit_chunks| { + let union_nulls = + bit_chunks + .iter() + .fold(0, |union_nulls, (field_type_id, field_bit_chunks)| { + let is_field = selection_mask(type_ids_remainder, *field_type_id); + let field_nulls = field_bit_chunks.remainder_bits(); + + union_nulls | is_field & field_nulls + }); + + union_nulls | without_nulls_selected(type_ids_remainder, &without_nulls_ids) + }, + ) + } + + /// Computes the logical nulls for a sparse union, optimized for when all fields contains nulls + fn mask_sparse_all_with_nulls_skip_one(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer { + // Example logic for a union with 3 fields, a, b & c, all containing nulls: + // let [a_nulls, b_nulls, c_nulls] = nulls; + // We can skip the first field: it's selection mask is the negation of all others selection mask + // let [is_b, is_c] = selection_masks; + // let is_a = !(is_b | is_c) + // let union_chunk_nulls = (is_a & a_nulls) | (is_b & b_nulls) | (is_c & c_nulls) + self.mask_sparse_helper( + nulls, + |type_ids_chunk_array, nulls_masks_iters| { + let (is_not_first, union_nulls) = nulls_masks_iters[1..] // skip first + .iter_mut() + .fold( + (0, 0), + |(is_not_first, union_nulls), (field_type_id, nulls_iter)| { + let field_nulls = nulls_iter.next().unwrap(); + let is_field = selection_mask(type_ids_chunk_array, *field_type_id); + + ( + is_not_first | is_field, + union_nulls | (is_field & field_nulls), + ) + }, + ); + + let is_first = !is_not_first; + let first_nulls = nulls_masks_iters[0].1.next().unwrap(); + + (is_first & first_nulls) | union_nulls + }, + |type_ids_remainder, bit_chunks| { + bit_chunks + .iter() + .fold(0, |union_nulls, (field_type_id, field_bit_chunks)| { + let field_nulls = field_bit_chunks.remainder_bits(); + // The same logic as above, except that since this runs at most once, + // it doesn't make difference to speed-up the first selection mask + let is_field = selection_mask(type_ids_remainder, *field_type_id); + + union_nulls | (is_field & field_nulls) + }) + }, + ) + } + + /// Maps `nulls` to `BitChunk's` and then to `BitChunkIterator's`, then divides `self.type_ids` into exact chunks of 64 values, + /// calling `mask_chunk` for every exact chunk, and `mask_remainder` for the remainder, if any, collecting the result in a `BooleanBuffer` + fn mask_sparse_helper( + &self, + nulls: Vec<(i8, NullBuffer)>, + mut mask_chunk: impl FnMut(&[i8; 64], &mut [(i8, BitChunkIterator)]) -> u64, + mask_remainder: impl FnOnce(&[i8], &[(i8, BitChunks)]) -> u64, + ) -> BooleanBuffer { + let bit_chunks = nulls + .iter() + .map(|(type_id, nulls)| (*type_id, nulls.inner().bit_chunks())) + .collect::>(); + + let mut nulls_masks_iter = bit_chunks + .iter() + .map(|(type_id, bit_chunks)| (*type_id, bit_chunks.iter())) + .collect::>(); + + let chunks_exact = self.type_ids.chunks_exact(64); + let remainder = chunks_exact.remainder(); + + let chunks = chunks_exact.map(|type_ids_chunk| { + let type_ids_chunk_array = <&[i8; 64]>::try_from(type_ids_chunk).unwrap(); + + mask_chunk(type_ids_chunk_array, &mut nulls_masks_iter) + }); + + // SAFETY: + // chunks is a ChunksExact iterator, which implements TrustedLen, and correctly reports its length + let mut buffer = unsafe { MutableBuffer::from_trusted_len_iter(chunks) }; + + if !remainder.is_empty() { + buffer.push(mask_remainder(remainder, &bit_chunks)); + } + + BooleanBuffer::new(buffer.into(), 0, self.type_ids.len()) + } + + /// Computes the logical nulls for a sparse or dense union, by gathering individual bits from the null buffer of the selected field + fn gather_nulls(&self, nulls: Vec<(i8, NullBuffer)>) -> BooleanBuffer { + let one_null = NullBuffer::new_null(1); + let one_valid = NullBuffer::new_valid(1); + + // Unsafe code below depend on it: + // To remove one branch from the loop, if the a type_id is not utilized, or it's logical_nulls is None/all set, + // we use a null buffer of len 1 and a index_mask of 0, or the true null buffer and usize::MAX otherwise. + // We then unconditionally access the null buffer with index & index_mask, + // which always return 0 for the 1-len buffer, or the true index unchanged otherwise + // We also use a 256 array, so llvm knows that `type_id as u8 as usize` is always in bounds + let mut logical_nulls_array = [(&one_valid, Mask::Zero); 256]; + + for (type_id, nulls) in &nulls { + if nulls.null_count() == nulls.len() { + // Similarly, if all values are null, use a 1-null null-buffer to reduce cache pressure a bit + logical_nulls_array[*type_id as u8 as usize] = (&one_null, Mask::Zero); + } else { + logical_nulls_array[*type_id as u8 as usize] = (nulls, Mask::Max); + } + } + + match &self.offsets { + Some(offsets) => { + assert_eq!(self.type_ids.len(), offsets.len()); + + BooleanBuffer::collect_bool(self.type_ids.len(), |i| unsafe { + // SAFETY: BooleanBuffer::collect_bool calls us 0..self.type_ids.len() + let type_id = *self.type_ids.get_unchecked(i); + // SAFETY: We asserted that offsets len and self.type_ids len are equal + let offset = *offsets.get_unchecked(i); + + let (nulls, offset_mask) = &logical_nulls_array[type_id as u8 as usize]; + + // SAFETY: + // If offset_mask is Max + // 1. Offset validity is checked at union creation + // 2. If the null buffer len equals it's array len is checked at array creation + // If offset_mask is Zero, the null buffer len is 1 + nulls + .inner() + .value_unchecked(offset as usize & *offset_mask as usize) + }) + } + None => { + BooleanBuffer::collect_bool(self.type_ids.len(), |index| unsafe { + // SAFETY: BooleanBuffer::collect_bool calls us 0..self.type_ids.len() + let type_id = *self.type_ids.get_unchecked(index); + + let (nulls, index_mask) = &logical_nulls_array[type_id as u8 as usize]; + + // SAFETY: + // If index_mask is Max + // 1. On sparse union, every child len match it's parent, this is checked at union creation + // 2. If the null buffer len equals it's array len is checked at array creation + // If index_mask is Zero, the null buffer len is 1 + nulls.inner().value_unchecked(index & *index_mask as usize) + }) + } + } + } } impl From for UnionArray { @@ -479,22 +752,130 @@ impl Array for UnionArray { None } - /// Union types always return non null as there is no validity buffer. - /// To check validity correctly you must check the underlying vector. - fn is_null(&self, _index: usize) -> bool { - false - } + fn logical_nulls(&self) -> Option { + let fields = match self.data_type() { + DataType::Union(fields, _) => fields, + _ => unreachable!(), + }; - /// Union types always return non null as there is no validity buffer. - /// To check validity correctly you must check the underlying vector. - fn is_valid(&self, _index: usize) -> bool { - true + if fields.len() <= 1 { + return self + .fields + .iter() + .flatten() + .map(Array::logical_nulls) + .next() + .flatten(); + } + + let logical_nulls = fields + .iter() + .filter_map(|(type_id, _)| Some((type_id, self.child(type_id).logical_nulls()?))) + .filter(|(_, nulls)| nulls.null_count() > 0) + .collect::>(); + + if logical_nulls.is_empty() { + return None; + } + + let fully_null_count = logical_nulls + .iter() + .filter(|(_, nulls)| nulls.null_count() == nulls.len()) + .count(); + + if fully_null_count == fields.len() { + if let Some((_, exactly_sized)) = logical_nulls + .iter() + .find(|(_, nulls)| nulls.len() == self.len()) + { + return Some(exactly_sized.clone()); + } + + if let Some((_, bigger)) = logical_nulls + .iter() + .find(|(_, nulls)| nulls.len() > self.len()) + { + return Some(bigger.slice(0, self.len())); + } + + return Some(NullBuffer::new_null(self.len())); + } + + let boolean_buffer = match &self.offsets { + Some(_) => self.gather_nulls(logical_nulls), + None => { + // Choose the fastest way to compute the logical nulls + // Gather computes one null per iteration, while the others work on 64 nulls chunks, + // but must also compute selection masks, which is expensive, + // so it's cost is the number of selection masks computed per chunk + // Since computing the selection mask gets auto-vectorized, it's performance depends on which simd feature is enabled + // For gather, the cost is the threshold where masking becomes slower than gather, which is determined with benchmarks + // TODO: bench on avx512f(feature is still unstable) + let gather_relative_cost = if cfg!(target_feature = "avx2") { + 10 + } else if cfg!(target_feature = "sse4.1") { + 3 + } else if cfg!(target_arch = "x86") || cfg!(target_arch = "x86_64") { + // x86 baseline includes sse2 + 2 + } else { + // TODO: bench on non x86 + // Always use gather on non benchmarked archs because even though it may slower on some cases, + // it's performance depends only on the union length, without being affected by the number of fields + 0 + }; + + let strategies = [ + (SparseStrategy::Gather, gather_relative_cost, true), + ( + SparseStrategy::MaskAllFieldsWithNullsSkipOne, + fields.len() - 1, + fields.len() == logical_nulls.len(), + ), + ( + SparseStrategy::MaskSkipWithoutNulls, + logical_nulls.len(), + true, + ), + ( + SparseStrategy::MaskSkipFullyNull, + fields.len() - fully_null_count, + true, + ), + ]; + + let (strategy, _, _) = strategies + .iter() + .filter(|(_, _, applicable)| *applicable) + .min_by_key(|(_, cost, _)| cost) + .unwrap(); + + match strategy { + SparseStrategy::Gather => self.gather_nulls(logical_nulls), + SparseStrategy::MaskAllFieldsWithNullsSkipOne => { + self.mask_sparse_all_with_nulls_skip_one(logical_nulls) + } + SparseStrategy::MaskSkipWithoutNulls => { + self.mask_sparse_skip_without_nulls(logical_nulls) + } + SparseStrategy::MaskSkipFullyNull => { + self.mask_sparse_skip_fully_null(logical_nulls) + } + } + } + }; + + let null_buffer = NullBuffer::from(boolean_buffer); + + if null_buffer.null_count() > 0 { + Some(null_buffer) + } else { + None + } } - /// Union types always return 0 null count as there is no validity buffer. - /// To get null count correctly you must check the underlying vector. - fn null_count(&self) -> usize { - 0 + fn is_nullable(&self) -> bool { + true } fn get_buffer_memory_size(&self) -> usize { @@ -562,6 +943,49 @@ impl std::fmt::Debug for UnionArray { } } +/// How to compute the logical nulls of a sparse union. All strategies return the same result. +/// Those starting with Mask perform bitwise masking for each chunk of 64 values, including +/// computing expensive selection masks of fields: which fields masks must be computed is the +/// difference between them +enum SparseStrategy { + /// Gather individual bits from the null buffer of the selected field + Gather, + /// All fields contains nulls, so we can skip the selection mask computation of one field by negating the others + MaskAllFieldsWithNullsSkipOne, + /// Skip the selection mask computation of the fields without nulls + MaskSkipWithoutNulls, + /// Skip the selection mask computation of the fully nulls fields + MaskSkipFullyNull, +} + +#[derive(Copy, Clone)] +#[repr(usize)] +enum Mask { + Zero = 0, + // false positive, see https://github.com/rust-lang/rust-clippy/issues/8043 + #[allow(clippy::enum_clike_unportable_variant)] + Max = usize::MAX, +} + +fn selection_mask(type_ids_chunk: &[i8], type_id: i8) -> u64 { + type_ids_chunk + .iter() + .copied() + .enumerate() + .fold(0, |packed, (bit_idx, v)| { + packed | ((v == type_id) as u64) << bit_idx + }) +} + +/// Returns a bitmask where bits indicate if any id from `without_nulls_ids` exist in `type_ids_chunk`. +fn without_nulls_selected(type_ids_chunk: &[i8], without_nulls_ids: &[i8]) -> u64 { + without_nulls_ids + .iter() + .fold(0, |fully_valid_selected, field_type_id| { + fully_valid_selected | selection_mask(type_ids_chunk, *field_type_id) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -571,8 +995,8 @@ mod tests { use crate::builder::UnionBuilder; use crate::cast::AsArray; use crate::types::{Float32Type, Float64Type, Int32Type, Int64Type}; - use crate::RecordBatch; use crate::{Float64Array, Int32Array, Int64Array, StringArray}; + use crate::{Int8Array, RecordBatch}; use arrow_buffer::Buffer; use arrow_schema::{Field, Schema}; @@ -1363,7 +1787,12 @@ mod tests { ]; let type_ids = vec![3, 3, 2].into(); - UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap(); + let err = + UnionArray::try_new(fields.clone(), type_ids, None, children.clone()).unwrap_err(); + assert_eq!( + err.to_string(), + "Invalid argument error: Sparse union child arrays must be equal in length to the length of the union" + ); let type_ids = vec![1, 2].into(); let err = @@ -1413,4 +1842,300 @@ mod tests { "Invalid argument error: Union fields length must match child arrays length" ); } + + #[test] + fn test_logical_nulls_fast_paths() { + // fields.len() <= 1 + let array = UnionArray::try_new(UnionFields::empty(), vec![].into(), None, vec![]).unwrap(); + + assert_eq!(array.logical_nulls(), None); + + let fields = UnionFields::new( + [1, 3], + [ + Field::new("a", DataType::Int8, false), // non nullable + Field::new("b", DataType::Int8, false), // non nullable + ], + ); + let array = UnionArray::try_new( + fields, + vec![1].into(), + None, + vec![ + Arc::new(Int8Array::from_value(5, 1)), + Arc::new(Int8Array::from_value(5, 1)), + ], + ) + .unwrap(); + + assert_eq!(array.logical_nulls(), None); + + let nullable_fields = UnionFields::new( + [1, 3], + [ + Field::new("a", DataType::Int8, true), // nullable but without nulls + Field::new("b", DataType::Int8, true), // nullable but without nulls + ], + ); + let array = UnionArray::try_new( + nullable_fields.clone(), + vec![1, 1].into(), + None, + vec![ + Arc::new(Int8Array::from_value(-5, 2)), // nullable but without nulls + Arc::new(Int8Array::from_value(-5, 2)), // nullable but without nulls + ], + ) + .unwrap(); + + assert_eq!(array.logical_nulls(), None); + + let array = UnionArray::try_new( + nullable_fields.clone(), + vec![1, 1].into(), + None, + vec![ + // every children is completly null + Arc::new(Int8Array::new_null(2)), // all null, same len as it's parent + Arc::new(Int8Array::new_null(2)), // all null, same len as it's parent + ], + ) + .unwrap(); + + assert_eq!(array.logical_nulls(), Some(NullBuffer::new_null(2))); + + let array = UnionArray::try_new( + nullable_fields.clone(), + vec![1, 1].into(), + Some(vec![0, 1].into()), + vec![ + // every children is completly null + Arc::new(Int8Array::new_null(3)), // bigger that parent + Arc::new(Int8Array::new_null(3)), // bigger that parent + ], + ) + .unwrap(); + + assert_eq!(array.logical_nulls(), Some(NullBuffer::new_null(2))); + } + + #[test] + fn test_dense_union_logical_nulls_gather() { + // union of [{A=1}, {A=2}, {B=3.2}, {B=}, {C=}, {C=}] + let int_array = Int32Array::from(vec![1, 2]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::new_null(1); + let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 0] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from(vec![true, true, true, false, false, false]); + assert_eq!(Some(expected), result); + } + + #[test] + fn test_sparse_union_logical_nulls_mask_all_nulls_skip_one() { + // If we used union_fields() (3 fields with nulls), the choosen strategy would be Gather on x86 without any specified target feature e.g CI runtime + let fields: UnionFields = [ + (1, Arc::new(Field::new("A", DataType::Int32, true))), + (3, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect(); + + // union of [{A=}, {A=}, {B=3.2}, {B=}] + let int_array = Int32Array::new_null(4); + let float_array = Float64Array::from(vec![None, None, Some(3.2), None]); + let type_ids = [1, 1, 3, 3].into_iter().collect::>(); + + let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + + let array = UnionArray::try_new(fields.clone(), type_ids, None, children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from(vec![false, false, true, false]); + assert_eq!(Some(expected), result); + + //like above, but repeated to genereate two exact bitmasks and a non empty remainder + let len = 2 * 64 + 32; + + let int_array = Int32Array::new_null(len); + let float_array = Float64Array::from_iter([Some(3.2), None].into_iter().cycle().take(len)); + let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3].into_iter().cycle().take(len)); + + let array = UnionArray::try_new( + fields, + type_ids, + None, + vec![Arc::new(int_array), Arc::new(float_array)], + ) + .unwrap(); + + let result = array.logical_nulls(); + + let expected = + NullBuffer::from_iter([false, false, true, false].into_iter().cycle().take(len)); + assert_eq!(array.len(), len); + assert_eq!(Some(expected), result); + } + + #[test] + fn test_sparse_union_logical_mask_mixed_nulls_skip_fully_valid() { + // union of [{A=2}, {A=2}, {B=3.2}, {B=}, {C=}, {C=}] + let int_array = Int32Array::from_value(2, 6); + let float_array = Float64Array::from_value(4.2, 6); + let str_array = StringArray::new_null(6); + let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from(vec![true, true, true, true, false, false]); + assert_eq!(Some(expected), result); + + //like above, but repeated to genereate two exact bitmasks and a non empty remainder + let len = 2 * 64 + 32; + + let int_array = Int32Array::from_value(2, len); + let float_array = Float64Array::from_value(4.2, len); + let str_array = StringArray::from_iter([None, Some("a")].into_iter().cycle().take(len)); + let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3, 4, 4].into_iter().cycle().take(len)); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from_iter( + [true, true, true, true, false, true] + .into_iter() + .cycle() + .take(len), + ); + assert_eq!(array.len(), len); + assert_eq!(Some(expected), result); + } + + #[test] + fn test_sparse_union_logical_mask_mixed_nulls_skip_fully_null() { + // union of [{A=}, {A=}, {B=4.2}, {B=4.2}, {C=}, {C=}] + let int_array = Int32Array::new_null(6); + let float_array = Float64Array::from_value(4.2, 6); + let str_array = StringArray::new_null(6); + let type_ids = [1, 1, 3, 3, 4, 4].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from(vec![false, false, true, true, false, false]); + assert_eq!(Some(expected), result); + + //like above, but repeated to genereate two exact bitmasks and a non empty remainder + let len = 2 * 64 + 32; + + let int_array = Int32Array::new_null(len); + let float_array = Float64Array::from_value(4.2, len); + let str_array = StringArray::new_null(len); + let type_ids = ScalarBuffer::from_iter([1, 1, 3, 3, 4, 4].into_iter().cycle().take(len)); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from_iter( + [false, false, true, true, false, false] + .into_iter() + .cycle() + .take(len), + ); + assert_eq!(array.len(), len); + assert_eq!(Some(expected), result); + } + + #[test] + fn test_sparse_union_logical_nulls_gather() { + let n_fields = 50; + + let non_null = Int32Array::from_value(2, 4); + let mixed = Int32Array::from(vec![None, None, Some(1), None]); + let fully_null = Int32Array::new_null(4); + + let array = UnionArray::try_new( + (1..) + .step_by(2) + .map(|i| { + ( + i, + Arc::new(Field::new(format!("f{i}"), DataType::Int32, true)), + ) + }) + .take(n_fields) + .collect(), + vec![1, 3, 3, 5].into(), + None, + [ + Arc::new(non_null) as ArrayRef, + Arc::new(mixed), + Arc::new(fully_null), + ] + .into_iter() + .cycle() + .take(n_fields) + .collect(), + ) + .unwrap(); + + let result = array.logical_nulls(); + + let expected = NullBuffer::from(vec![true, false, true, false]); + + assert_eq!(Some(expected), result); + } + + fn union_fields() -> UnionFields { + [ + (1, Arc::new(Field::new("A", DataType::Int32, true))), + (3, Arc::new(Field::new("B", DataType::Float64, true))), + (4, Arc::new(Field::new("C", DataType::Utf8, true))), + ] + .into_iter() + .collect() + } }