Skip to content

Commit

Permalink
Remove ArrayData from Array (apache#3880)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Apr 12, 2023
1 parent 6ce332a commit c2c61cf
Show file tree
Hide file tree
Showing 18 changed files with 884 additions and 562 deletions.
7 changes: 3 additions & 4 deletions arrow-arith/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,13 +1241,12 @@ mod tests {
.into_iter()
.collect();
let sliced_input = sliced_input.slice(4, 2);
let sliced_input = sliced_input.as_boolean();

assert_eq!(sliced_input, &input);
assert_eq!(sliced_input, input);

let actual = min_boolean(sliced_input);
let actual = min_boolean(&sliced_input);
assert_eq!(actual, expected);
let actual = max_boolean(sliced_input);
let actual = max_boolean(&sliced_input);
assert_eq!(actual, expected);
}

Expand Down
82 changes: 56 additions & 26 deletions arrow-array/src/array/boolean_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::builder::BooleanBuilder;
use crate::iterator::BooleanIter;
use crate::{Array, ArrayAccessor, ArrayRef};
use arrow_buffer::{bit_util, BooleanBuffer, MutableBuffer, NullBuffer};
use arrow_data::ArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;
Expand Down Expand Up @@ -66,8 +66,8 @@ use std::sync::Arc;
/// ```
#[derive(Clone)]
pub struct BooleanArray {
data: ArrayData,
values: BooleanBuffer,
nulls: Option<NullBuffer>,
}

impl std::fmt::Debug for BooleanArray {
Expand All @@ -90,27 +90,25 @@ impl BooleanArray {
if let Some(n) = nulls.as_ref() {
assert_eq!(values.len(), n.len());
}

// TODO: Don't store ArrayData inside arrays (#3880)
let data = unsafe {
ArrayData::builder(DataType::Boolean)
.len(values.len())
.offset(values.offset())
.nulls(nulls)
.buffers(vec![values.inner().clone()])
.build_unchecked()
};
Self { data, values }
Self { values, nulls }
}

/// Returns the length of this array.
pub fn len(&self) -> usize {
self.data.len()
self.values.len()
}

/// Returns whether this array is empty.
pub fn is_empty(&self) -> bool {
self.data.is_empty()
self.values.is_empty()
}

/// Returns a zero-copy slice of this array with the indicated offset and length.
pub fn slice(&self, offset: usize, length: usize) -> Self {
Self {
values: self.values.slice(offset, length),
nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)),
}
}

/// Returns a new boolean array builder
Expand All @@ -125,7 +123,7 @@ impl BooleanArray {

/// Returns the number of non null, true values within this array
pub fn true_count(&self) -> usize {
match self.data.nulls() {
match self.nulls() {
Some(nulls) => {
let null_chunks = nulls.inner().bit_chunks().iter_padded();
let value_chunks = self.values().bit_chunks().iter_padded();
Expand Down Expand Up @@ -247,25 +245,48 @@ impl Array for BooleanArray {
self
}

fn data(&self) -> &ArrayData {
&self.data
}

fn to_data(&self) -> ArrayData {
self.data.clone()
self.clone().into()
}

fn into_data(self) -> ArrayData {
self.into()
}

fn data_type(&self) -> &DataType {
&DataType::Boolean
}

fn slice(&self, offset: usize, length: usize) -> ArrayRef {
// TODO: Slice buffers directly (#3880)
Arc::new(Self::from(self.data.slice(offset, length)))
Arc::new(self.slice(offset, length))
}

fn len(&self) -> usize {
self.values.len()
}

fn is_empty(&self) -> bool {
self.values.is_empty()
}

fn offset(&self) -> usize {
self.values.offset()
}

fn nulls(&self) -> Option<&NullBuffer> {
self.data.nulls()
self.nulls.as_ref()
}

fn get_buffer_memory_size(&self) -> usize {
let mut sum = self.values.inner().capacity();
if let Some(x) = &self.nulls {
sum += x.buffer().capacity()
}
sum
}

fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}
}

Expand Down Expand Up @@ -324,13 +345,22 @@ impl From<ArrayData> for BooleanArray {
let values =
BooleanBuffer::new(data.buffers()[0].clone(), data.offset(), data.len());

Self { data, values }
Self {
values,
nulls: data.nulls().cloned(),
}
}
}

impl From<BooleanArray> for ArrayData {
fn from(array: BooleanArray) -> Self {
array.data
let builder = ArrayDataBuilder::new(DataType::Boolean)
.len(array.values.len())
.offset(array.values.offset())
.nulls(array.nulls)
.buffers(vec![array.values.into_inner()]);

unsafe { builder.build_unchecked() }
}
}

Expand Down
90 changes: 64 additions & 26 deletions arrow-array/src/array/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::types::ByteArrayType;
use crate::{Array, ArrayAccessor, ArrayRef, OffsetSizeTrait};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_buffer::{NullBuffer, OffsetBuffer};
use arrow_data::ArrayData;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;
Expand All @@ -39,17 +39,19 @@ use std::sync::Arc;
/// [`BinaryArray`]: crate::BinaryArray
/// [`LargeBinaryArray`]: crate::LargeBinaryArray
pub struct GenericByteArray<T: ByteArrayType> {
data: ArrayData,
data_type: DataType,
value_offsets: OffsetBuffer<T::Offset>,
value_data: Buffer,
nulls: Option<NullBuffer>,
}

impl<T: ByteArrayType> Clone for GenericByteArray<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
data_type: self.data_type.clone(),
value_offsets: self.value_offsets.clone(),
value_data: self.value_data.clone(),
nulls: self.nulls.clone(),
}
}
}
Expand Down Expand Up @@ -135,7 +137,7 @@ impl<T: ByteArrayType> GenericByteArray<T> {
/// Panics if index `i` is out of bounds.
pub fn value(&self, i: usize) -> &T::Native {
assert!(
i < self.data.len(),
i < self.len(),
"Trying to access an element at index {} from a {}{}Array of length {}",
i,
T::Offset::PREFIX,
Expand All @@ -154,29 +156,33 @@ impl<T: ByteArrayType> GenericByteArray<T> {

/// Returns a zero-copy slice of this array with the indicated offset and length.
pub fn slice(&self, offset: usize, length: usize) -> Self {
// TODO: Slice buffers directly (#3880)
self.data.slice(offset, length).into()
Self {
data_type: self.data_type.clone(),
value_offsets: self.value_offsets.slice(offset, length),
value_data: self.value_data.clone(),
nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)),
}
}

/// Returns `GenericByteBuilder` of this byte array for mutating its values if the underlying
/// offset and data buffers are not shared by others.
pub fn into_builder(self) -> Result<GenericByteBuilder<T>, Self> {
let len = self.len();
let null_bit_buffer = self.data.nulls().map(|b| b.inner().sliced());
let value_len =
T::Offset::as_usize(self.value_offsets()[len] - self.value_offsets()[0]);

let data = self.into_data();
let null_bit_buffer = data.nulls().map(|b| b.inner().sliced());

let element_len = std::mem::size_of::<T::Offset>();
let offset_buffer = self.data.buffers()[0]
.slice_with_length(self.data.offset() * element_len, (len + 1) * element_len);
let offset_buffer = data.buffers()[0]
.slice_with_length(data.offset() * element_len, (len + 1) * element_len);

let element_len = std::mem::size_of::<u8>();
let value_len =
T::Offset::as_usize(self.value_offsets()[len] - self.value_offsets()[0]);
let value_buffer = self.data.buffers()[1]
.slice_with_length(self.data.offset() * element_len, value_len * element_len);
let value_buffer = data.buffers()[1]
.slice_with_length(data.offset() * element_len, value_len * element_len);

drop(self.data);
drop(self.value_data);
drop(self.value_offsets);
drop(data);

let try_mutable_null_buffer = match null_bit_buffer {
None => Ok(None),
Expand Down Expand Up @@ -258,24 +264,49 @@ impl<T: ByteArrayType> Array for GenericByteArray<T> {
self
}

fn data(&self) -> &ArrayData {
&self.data
}

fn to_data(&self) -> ArrayData {
self.data.clone()
self.clone().into()
}

fn into_data(self) -> ArrayData {
self.into()
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn slice(&self, offset: usize, length: usize) -> ArrayRef {
Arc::new(self.slice(offset, length))
}

fn len(&self) -> usize {
self.value_offsets.len() - 1
}

fn is_empty(&self) -> bool {
self.value_offsets.len() <= 1
}

fn offset(&self) -> usize {
0
}

fn nulls(&self) -> Option<&NullBuffer> {
self.data.nulls()
self.nulls.as_ref()
}

fn get_buffer_memory_size(&self) -> usize {
let mut sum = self.value_offsets.inner().inner().capacity();
sum += self.value_data.capacity();
if let Some(x) = &self.nulls {
sum += x.buffer().capacity()
}
sum
}

fn get_array_memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.get_buffer_memory_size()
}
}

Expand Down Expand Up @@ -313,18 +344,25 @@ impl<T: ByteArrayType> From<ArrayData> for GenericByteArray<T> {
let value_offsets = unsafe { get_offsets(&data) };
let value_data = data.buffers()[1].clone();
Self {
data,
// SAFETY:
// ArrayData must be valid, and validated data type above
value_offsets,
value_data,
data_type: data.data_type().clone(),
nulls: data.nulls().cloned(),
}
}
}

impl<T: ByteArrayType> From<GenericByteArray<T>> for ArrayData {
fn from(array: GenericByteArray<T>) -> Self {
array.data
let len = array.len();

let offsets = array.value_offsets.into_inner().into_inner();
let builder = ArrayDataBuilder::new(array.data_type)
.len(len)
.buffers(vec![offsets, array.value_data])
.nulls(array.nulls);

unsafe { builder.build_unchecked() }
}
}

Expand Down
Loading

0 comments on commit c2c61cf

Please sign in to comment.