Skip to content

Commit

Permalink
Implement UnionArray logical_nulls (apache#6303)
Browse files Browse the repository at this point in the history
* fix: implement UnionArray logical_nulls

* fix: allow false positive clippy warning

* fix: replace unstable functions

* improve docs, perf, validate sparse, revert msrv

* simplify benches

* fine tune fast path threshold

* update docs

* fix fast path check, improve perf and docs

* fix: remove rust 1.65 code

* simplify mask_sparse_

* apply suggestions from review

* Update arrow-array/src/array/union_array.rs

Co-authored-by: wiedld <[email protected]>

---------

Co-authored-by: wiedld <[email protected]>
  • Loading branch information
gstvg and wiedld authored Oct 2, 2024
1 parent 2c524f1 commit a303fef
Show file tree
Hide file tree
Showing 5 changed files with 893 additions and 19 deletions.
60 changes: 60 additions & 0 deletions arrow-arith/src/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,9 @@ pub fn is_not_null(input: &dyn Array) -> Result<BooleanArray, ArrowError> {

#[cfg(test)]
mod tests {
use arrow_buffer::ScalarBuffer;
use arrow_schema::{DataType, Field, UnionFields};

use super::*;
use std::sync::Arc;

Expand Down Expand Up @@ -911,4 +914,61 @@ mod tests {
assert_eq!(expected, res);
assert!(res.nulls().is_none());
}

#[test]
fn test_dense_union_is_null() {
// union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
let int_array = Int32Array::from(vec![Some(1), None]);
let float_array = Float64Array::from(vec![Some(3.2), None]);
let str_array = StringArray::from(vec![Some("a"), None]);
let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::<ScalarBuffer<i8>>();
let offsets = [0, 1, 0, 1, 0, 1]
.into_iter()
.collect::<ScalarBuffer<i32>>();

let children = vec![
Arc::new(int_array) as Arc<dyn Array>,
Arc::new(float_array),
Arc::new(str_array),
];

let array = UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap();

let result = is_null(&array).unwrap();

let expected = &BooleanArray::from(vec![false, true, false, true, false, true]);
assert_eq!(expected, &result);
}

#[test]
fn test_sparse_union_is_null() {
// union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}]
let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]);
let float_array = Float64Array::from(vec![None, None, Some(3.2), None, None, None]);
let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]);
let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::<ScalarBuffer<i8>>();

let children = vec![
Arc::new(int_array) as Arc<dyn Array>,
Arc::new(float_array),
Arc::new(str_array),
];

let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap();

let result = is_null(&array).unwrap();

let expected = &BooleanArray::from(vec![false, true, false, true, false, true]);
assert_eq!(expected, &result);
}

fn union_fields() -> UnionFields {
[
(0, Arc::new(Field::new("A", DataType::Int32, true))),
(1, Arc::new(Field::new("B", DataType::Float64, true))),
(2, Arc::new(Field::new("C", DataType::Utf8, true))),
]
.into_iter()
.collect()
}
}
4 changes: 4 additions & 0 deletions arrow-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ harness = false
[[bench]]
name = "decimal_overflow"
harness = false

[[bench]]
name = "union_array"
harness = false
84 changes: 84 additions & 0 deletions arrow-array/benches/union_array.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{
iter::{repeat, repeat_with},
sync::Arc,
};

use arrow_array::{Array, ArrayRef, Int32Array, UnionArray};
use arrow_buffer::{NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field, UnionFields};
use criterion::*;
use rand::{thread_rng, Rng};

fn array_with_nulls() -> ArrayRef {
let mut rng = thread_rng();

let values = ScalarBuffer::from_iter(repeat_with(|| rng.gen()).take(4096));

// nulls with at least one null and one valid
let nulls: NullBuffer = [true, false]
.into_iter()
.chain(repeat_with(|| rng.gen()))
.take(4096)
.collect();

Arc::new(Int32Array::new(values.clone(), Some(nulls)))
}

fn array_without_nulls() -> ArrayRef {
let mut rng = thread_rng();

let values = ScalarBuffer::from_iter(repeat_with(|| rng.gen()).take(4096));

Arc::new(Int32Array::new(values.clone(), None))
}

fn criterion_benchmark(c: &mut Criterion) {
for with_nulls in 1..12 {
for without_nulls in [0, 1, 10] {
c.bench_function(
&format!("union logical nulls 4096 {with_nulls} children with nulls, {without_nulls} without nulls"),
|b| {
let type_ids = 0..with_nulls+without_nulls;

let fields = UnionFields::new(
type_ids.clone(),
type_ids.clone().map(|i| Field::new(format!("f{i}"), DataType::Int32, true)),
);

let array = UnionArray::try_new(
fields,
type_ids.cycle().take(4096).collect(),
None,
repeat(array_with_nulls())
.take(with_nulls as usize)
.chain(repeat(array_without_nulls()).take(without_nulls as usize))
.collect(),
)
.unwrap();

b.iter(|| black_box(array.logical_nulls()))
},
);
}
}
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
5 changes: 3 additions & 2 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub trait Array: std::fmt::Debug + Send + Sync {
///
/// The physical representation is efficient, but is sometimes non intuitive
/// for certain array types such as those with nullable child arrays like
/// [`DictionaryArray::values`] or [`RunArray::values`], or without a
/// [`DictionaryArray::values`], [`RunArray::values`] or [`UnionArray`], or without a
/// null buffer, such as [`NullArray`].
///
/// To determine if each element of such an array is "logically" null,
Expand All @@ -209,6 +209,7 @@ pub trait Array: std::fmt::Debug + Send + Sync {
/// * [`DictionaryArray`] where [`DictionaryArray::values`] contains nulls
/// * [`RunArray`] where [`RunArray::values`] contains nulls
/// * [`NullArray`] where all indices are nulls
/// * [`UnionArray`] where the selected values contains nulls
///
/// In these cases a logical [`NullBuffer`] will be computed, encoding the
/// logical nullability of these arrays, beyond what is encoded in
Expand All @@ -221,7 +222,7 @@ pub trait Array: std::fmt::Debug + Send + Sync {
///
/// Note: For performance reasons, this method returns nullability solely as determined by the
/// null buffer. This difference can lead to surprising results, for example, [`NullArray::is_null`] always
/// returns `false` as the array lacks a null buffer. Similarly [`DictionaryArray`] and [`RunArray`] may
/// returns `false` as the array lacks a null buffer. Similarly [`DictionaryArray`], [`RunArray`] and [`UnionArray`] may
/// encode nullability in their children. See [`Self::logical_nulls`] for more information.
///
/// # Example:
Expand Down
Loading

0 comments on commit a303fef

Please sign in to comment.