Skip to content

Commit

Permalink
Update union array to new null handling
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 20, 2021
1 parent f3e452c commit a094eb7
Show file tree
Hide file tree
Showing 15 changed files with 256 additions and 83 deletions.
11 changes: 9 additions & 2 deletions arrow-flight/src/arrow.flight.protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub mod flight_service_client {
impl<T> FlightServiceClient<T>
where
T: tonic::client::GrpcService<tonic::body::BoxBody>,
T::ResponseBody: Body + Send + 'static,
T::ResponseBody: Body + Send + Sync + 'static,
T::Error: Into<StdError>,
<T::ResponseBody as Body>::Error: Into<StdError> + Send,
{
Expand Down Expand Up @@ -513,6 +513,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the Handshake method."]
type HandshakeStream: futures_core::Stream<Item = Result<super::HandshakeResponse, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Handshake between client and server. Depending on the server, the"]
Expand All @@ -526,6 +527,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the ListFlights method."]
type ListFlightsStream: futures_core::Stream<Item = Result<super::FlightInfo, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Get a list of available streams given a particular criteria. Most flight"]
Expand Down Expand Up @@ -565,6 +567,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the DoGet method."]
type DoGetStream: futures_core::Stream<Item = Result<super::FlightData, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Retrieve a single stream associated with a particular descriptor"]
Expand All @@ -578,6 +581,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the DoPut method."]
type DoPutStream: futures_core::Stream<Item = Result<super::PutResult, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Push a stream to the flight service associated with a particular"]
Expand All @@ -593,6 +597,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the DoExchange method."]
type DoExchangeStream: futures_core::Stream<Item = Result<super::FlightData, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Open a bidirectional data channel for a given descriptor. This"]
Expand All @@ -607,6 +612,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the DoAction method."]
type DoActionStream: futures_core::Stream<Item = Result<super::Result, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " Flight services can support an arbitrary number of simple actions in"]
Expand All @@ -622,6 +628,7 @@ pub mod flight_service_server {
#[doc = "Server streaming response type for the ListActions method."]
type ListActionsStream: futures_core::Stream<Item = Result<super::ActionType, tonic::Status>>
+ Send
+ Sync
+ 'static;
#[doc = ""]
#[doc = " A flight service exposes all of the available action types that it has"]
Expand Down Expand Up @@ -667,7 +674,7 @@ pub mod flight_service_server {
impl<T, B> tonic::codegen::Service<http::Request<B>> for FlightServiceServer<T>
where
T: FlightService,
B: Body + Send + 'static,
B: Body + Send + Sync + 'static,
B::Error: Into<StdError> + Send + 'static,
{
type Response = http::Response<tonic::body::BoxBody>;
Expand Down
4 changes: 2 additions & 2 deletions arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ pub fn make_array(data: ArrayData) -> ArrayRef {
DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as ArrayRef,
DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef,
DataType::Map(_, _) => Arc::new(MapArray::from(data)) as ArrayRef,
DataType::Union(_) => Arc::new(UnionArray::from(data)) as ArrayRef,
DataType::Union(_, _) => Arc::new(UnionArray::from(data)) as ArrayRef,
DataType::FixedSizeList(_, _) => {
Arc::new(FixedSizeListArray::from(data)) as ArrayRef
}
Expand Down Expand Up @@ -472,7 +472,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
DataType::Map(field, _keys_sorted) => {
new_null_list_array::<i32>(data_type, field.data_type(), length)
}
DataType::Union(_) => {
DataType::Union(_, _) => {
unimplemented!("Creating null Union array not yet supported")
}
DataType::Dictionary(key, value) => {
Expand Down
53 changes: 29 additions & 24 deletions arrow/src/array/array_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/// Contains the `UnionArray` type.
///
use crate::array::{data::count_nulls, make_array, Array, ArrayData, ArrayRef};
use crate::array::{make_array, Array, ArrayData, ArrayRef};
use crate::buffer::Buffer;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
Expand Down Expand Up @@ -48,15 +48,15 @@ impl UnionArray {
/// caller and assumes that each of the components are correct and consistent with each other.
/// See `try_new` for an alternative that validates the data provided.
///
/// # Data Consistency
/// # Safety
///
/// The `type_ids` `Buffer` should contain `i8` values. These values should be greater than
/// zero and must be less than the number of children provided in `child_arrays`. These values
/// are used to index into the `child_arrays`.
///
/// The `value_offsets` `Buffer` is only provided in the case of a dense union, sparse unions
/// should use `None`. If provided the `value_offsets` `Buffer` should contain `i32` values.
/// These values should be greater than zero and must be less than the length of the overall
/// should use `None`. If provided the `value_offsets` `Buffer` should contain `i32` values
/// Thee values in this array should be greater than zero and must be less than the length of the overall
/// array.
///
/// In both cases above we use signed integer types to maintain compatibility with other
Expand All @@ -65,7 +65,7 @@ impl UnionArray {
/// In both of the cases above we are accepting `Buffer`'s which are assumed to be representing
/// `i8` and `i32` values respectively. `Buffer` objects are untyped and no attempt is made
/// to ensure that the data provided is valid.
pub fn new(
pub unsafe fn new_unchecked(
type_ids: Buffer,
value_offsets: Option<Buffer>,
child_arrays: Vec<(Field, ArrayRef)>,
Expand All @@ -74,31 +74,36 @@ impl UnionArray {
let (field_types, field_values): (Vec<_>, Vec<_>) =
child_arrays.into_iter().unzip();
let len = type_ids.len();
let mut builder = ArrayData::builder(DataType::Union(field_types))

let mode = if value_offsets.is_some() {
UnionMode::Dense
} else {
UnionMode::Sparse
};

let mut builder = ArrayData::builder(DataType::Union(field_types, mode))
.add_buffer(type_ids)
.child_data(field_values.into_iter().map(|a| a.data().clone()).collect())
.len(len);
if let Some(bitmap) = bitmap_data {
builder = builder.null_bit_buffer(bitmap)
}
let data = unsafe {
match value_offsets {
Some(b) => builder.add_buffer(b).build_unchecked(),
None => builder.build_unchecked(),
}
let data = match value_offsets {
Some(b) => builder.add_buffer(b).build_unchecked(),
None => builder.build_unchecked(),
};
Self::from(data)
}
/// Attempts to create a new `UnionArray` and validates the inputs provided.

/// Attempts to create a new `UnionArray`, validating the inputs provided.
pub fn try_new(
type_ids: Buffer,
value_offsets: Option<Buffer>,
child_arrays: Vec<(Field, ArrayRef)>,
bitmap: Option<Buffer>,
) -> Result<Self> {
if let Some(b) = &value_offsets {
let nulls = count_nulls(bitmap.as_ref(), 0, type_ids.len());
if ((type_ids.len() - nulls) * 4) != b.len() {
if ((type_ids.len()) * 4) != b.len() {
return Err(ArrowError::InvalidArgumentError(
"Type Ids and Offsets represent a different number of array slots."
.to_string(),
Expand Down Expand Up @@ -137,7 +142,10 @@ impl UnionArray {
}
}

let new_self = Self::new(type_ids, value_offsets, child_arrays, bitmap);
// Unsafe Justification: arguments were validated above (and
// re-revalidated as part of data().validate() below)
let new_self =
unsafe { Self::new_unchecked(type_ids, value_offsets, child_arrays, bitmap) };
new_self.data().validate()?;

Ok(new_self)
Expand Down Expand Up @@ -173,15 +181,9 @@ impl UnionArray {
pub fn value_offset(&self, index: usize) -> i32 {
assert!(index - self.offset() < self.len());
if self.is_dense() {
// In format v4 unions had their own validity bitmap and offsets are compressed by omitting null values
// Starting with v5 unions don't have a validity bitmap and it's possible to directly index into the offsets buffer
let valid_slots = match self.data.null_buffer() {
Some(b) => b.count_set_bits_offset(0, index),
None => index,
};
// safety: reinterpreting is safe since the offset buffer contains `i32` values and is
// properly aligned.
unsafe { self.data().buffers()[1].typed_data::<i32>()[valid_slots] }
unsafe { self.data().buffers()[1].typed_data::<i32>()[index] }
} else {
index as i32
}
Expand All @@ -202,7 +204,7 @@ impl UnionArray {
/// Returns the names of the types in the union.
pub fn type_names(&self) -> Vec<&str> {
match self.data.data_type() {
DataType::Union(fields) => fields
DataType::Union(fields, _) => fields
.iter()
.map(|f| f.name().as_str())
.collect::<Vec<&str>>(),
Expand All @@ -212,7 +214,10 @@ impl UnionArray {

/// Returns whether the `UnionArray` is dense (or sparse if `false`).
fn is_dense(&self) -> bool {
self.data().buffers().len() == 2
match self.data.data_type() {
DataType::Union(_, mode) => mode == &UnionMode::Dense,
_ => unreachable!("Union array's data type is not a union!"),
}
}
}

Expand Down
14 changes: 9 additions & 5 deletions arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2143,12 +2143,16 @@ impl UnionBuilder {

self.type_id_builder.append(i8::default());

// Handle sparse union
if self.value_offset_builder.is_none() {
for (_, fd) in self.fields.iter_mut() {
fd.append_null_dynamic()?;
match &mut self.value_offset_builder {
// Handle dense union
Some(value_offset_builder) => value_offset_builder.append(i32::default()),
// Handle sparse union
None => {
for (_, fd) in self.fields.iter_mut() {
fd.append_null_dynamic()?;
}
}
}
};
self.len += 1;
Ok(())
}
Expand Down
Loading

0 comments on commit a094eb7

Please sign in to comment.