Skip to content

Commit

Permalink
Add RunEndBuffer (apache#1799) (apache#3817)
Browse files Browse the repository at this point in the history
* Add RunEndBuffer (apache#1799)

* Fix test

* Revert rename

* Format

* Clippy

* Remove unnecessary check

* Fix

* Tweak docs

* Add docs
  • Loading branch information
tustvold authored Mar 8, 2023
1 parent 81ed334 commit 36f2db3
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 177 deletions.
1 change: 1 addition & 0 deletions arrow-array/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ mod tests {
assert_eq!(array.null_count(), 0);
assert_eq!(array.values().len(), 1);
assert_eq!(array.values().null_count(), 1);
assert_eq!(array.run_ends().len(), 4);
assert_eq!(array.run_ends().values(), &[4]);

let idx = array.get_physical_indices(&[0, 1, 2, 3]).unwrap();
Expand Down
138 changes: 49 additions & 89 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::any::Any;

use arrow_buffer::buffer::RunEndBuffer;
use arrow_buffer::ArrowNativeType;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, Field};
Expand Down Expand Up @@ -62,7 +63,7 @@ use crate::{

pub struct RunArray<R: RunEndIndexType> {
data: ArrayData,
run_ends: PrimitiveArray<R>,
run_ends: RunEndBuffer<R::Native>,
values: ArrayRef,
}

Expand Down Expand Up @@ -110,11 +111,8 @@ impl<R: RunEndIndexType> RunArray<R> {
Ok(array_data.into())
}

/// Returns a reference to run_ends array
///
/// Note: any slicing of this [`RunArray`] array is not applied to the returned array
/// and must be handled separately
pub fn run_ends(&self) -> &PrimitiveArray<R> {
/// Returns a reference to [`RunEndBuffer`]
pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
&self.run_ends
}

Expand All @@ -128,19 +126,12 @@ impl<R: RunEndIndexType> RunArray<R> {

/// Returns the physical index at which the array slice starts.
pub fn get_start_physical_index(&self) -> usize {
if self.offset() == 0 {
return 0;
}
self.get_zero_offset_physical_index(self.offset()).unwrap()
self.run_ends.get_start_physical_index()
}

/// Returns the physical index at which the array slice ends.
pub fn get_end_physical_index(&self) -> usize {
if self.offset() + self.len() == Self::logical_len(&self.run_ends) {
return self.run_ends.len() - 1;
}
self.get_zero_offset_physical_index(self.offset() + self.len() - 1)
.unwrap()
self.run_ends.get_end_physical_index()
}

/// Downcast this [`RunArray`] to a [`TypedRunArray`]
Expand All @@ -164,47 +155,13 @@ impl<R: RunEndIndexType> RunArray<R> {
})
}

/// Returns index to the physical array for the given index to the logical array.
/// The function does not adjust the input logical index based on `ArrayData::offset`.
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_zero_offset_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= Self::logical_len(&self.run_ends) {
return None;
}
let mut st: usize = 0;
let mut en: usize = self.run_ends.len();
while st + 1 < en {
let mid: usize = (st + en) / 2;
if logical_index
< unsafe {
// Safety:
// The value of mid will always be between 1 and len - 1,
// where len is length of run ends array.
// This is based on the fact that `st` starts with 0 and
// `en` starts with len. The condition `st + 1 < en` ensures
// `st` and `en` differs atleast by two. So the value of `mid`
// will never be either `st` or `en`
self.run_ends.value_unchecked(mid - 1).as_usize()
}
{
en = mid
} else {
st = mid
}
}
Some(st)
}

/// Returns index to the physical array for the given index to the logical array.
/// This function adjusts the input logical index based on `ArrayData::offset`
/// Performs a binary search on the run_ends array for the input index.
#[inline]
pub fn get_physical_index(&self, logical_index: usize) -> Option<usize> {
if logical_index >= self.len() {
return None;
}
self.get_zero_offset_physical_index(logical_index + self.offset())
///
/// The result is arbitrary if `logical_index >= self.len()`
pub fn get_physical_index(&self, logical_index: usize) -> usize {
self.run_ends.get_physical_index(logical_index)
}

/// Returns the physical indices of the input logical indices. Returns error if any of the logical
Expand All @@ -222,6 +179,9 @@ impl<R: RunEndIndexType> RunArray<R> {
where
I: ArrowNativeType,
{
let len = self.run_ends().len();
let offset = self.run_ends().offset();

let indices_len = logical_indices.len();

if indices_len == 0 {
Expand All @@ -243,7 +203,7 @@ impl<R: RunEndIndexType> RunArray<R> {
// Return early if all the logical indices cannot be converted to physical indices.
let largest_logical_index =
logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= self.len() {
if largest_logical_index >= len {
return Err(ArrowError::InvalidArgumentError(format!(
"Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
)));
Expand All @@ -259,7 +219,7 @@ impl<R: RunEndIndexType> RunArray<R> {
self.run_ends.values().iter().enumerate().skip(skip_value)
{
// Get the run end index (relative to offset) of current physical index
let run_end_value = run_end.as_usize() - self.offset();
let run_end_value = run_end.as_usize() - offset;

// All the `logical_indices` that are less than current run end index
// belongs to current physical index.
Expand Down Expand Up @@ -295,7 +255,15 @@ impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
}
}

let run_ends = PrimitiveArray::<R>::from(data.child_data()[0].clone());
// Safety
// ArrayData is valid
let child = &data.child_data()[0];
assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends type");
let run_ends = unsafe {
let scalar = child.buffers()[0].clone().into();
RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
};

let values = make_array(data.child_data()[1].clone());
Self {
data,
Expand Down Expand Up @@ -330,7 +298,8 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
writeln!(
f,
"RunArray {{run_ends: {:?}, values: {:?}}}",
self.run_ends, self.values
self.run_ends.values(),
self.values
)
}
}
Expand All @@ -347,7 +316,7 @@ impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
/// .map(|&x| if x == "b" { None } else { Some(x) })
/// .collect();
/// assert_eq!(
/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 5,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
/// "RunArray {run_ends: [2, 3, 5], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
/// format!("{:?}", array)
/// );
/// ```
Expand All @@ -374,7 +343,7 @@ impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
/// let test = vec!["a", "a", "b", "c"];
/// let array: RunArray<Int16Type> = test.into_iter().collect();
/// assert_eq!(
/// "RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
/// "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
/// format!("{:?}", array)
/// );
/// ```
Expand All @@ -401,7 +370,7 @@ impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
///
/// let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int16RunArray = RunArray<Int16Type>;
Expand All @@ -416,7 +385,7 @@ pub type Int16RunArray = RunArray<Int16Type>;
///
/// let array: Int32RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int32Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int32RunArray = RunArray<Int32Type>;
Expand All @@ -431,7 +400,7 @@ pub type Int32RunArray = RunArray<Int32Type>;
///
/// let array: Int64RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
/// let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
/// assert_eq!(array.run_ends(), &Int64Array::from(vec![2, 3, 5]));
/// assert_eq!(array.run_ends().values(), &[2, 3, 5]);
/// assert_eq!(array.values(), &values);
/// ```
pub type Int64RunArray = RunArray<Int64Type>;
Expand Down Expand Up @@ -480,7 +449,7 @@ impl<'a, R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'a, R, V> {

impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
/// Returns the run_ends of this [`TypedRunArray`]
pub fn run_ends(&self) -> &'a PrimitiveArray<R> {
pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
self.run_array.run_ends()
}

Expand Down Expand Up @@ -531,7 +500,7 @@ where
}

unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
let physical_index = self.run_array.get_physical_index(logical_index).unwrap();
let physical_index = self.run_array.get_physical_index(logical_index);
self.values().value_unchecked(physical_index)
}
}
Expand Down Expand Up @@ -563,7 +532,7 @@ mod tests {
use crate::builder::PrimitiveRunBuilder;
use crate::cast::as_primitive_array;
use crate::types::{Int16Type, Int32Type, Int8Type, UInt32Type};
use crate::{Array, Int16Array, Int32Array, StringArray};
use crate::{Array, Int32Array, StringArray};

fn build_input_array(size: usize) -> Vec<Option<i32>> {
// The input array is created by shuffling and repeating
Expand Down Expand Up @@ -643,9 +612,10 @@ mod tests {
]);

// Construct a run_ends array:
let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values([
4_i16, 6, 7, 9, 13, 18, 20, 22,
]);
let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
let run_ends_data = PrimitiveArray::<Int16Type>::from_iter_values(
run_ends_values.iter().copied(),
);

// Construct a run ends encoded array from the above two
let ree_array =
Expand All @@ -659,8 +629,7 @@ mod tests {
assert_eq!(&DataType::Int8, values.data_type());

let run_ends = ree_array.run_ends();
assert_eq!(&run_ends_data.into_data(), run_ends.data());
assert_eq!(&DataType::Int16, run_ends.data_type());
assert_eq!(run_ends.values(), &run_ends_values);
}

#[test]
Expand All @@ -671,7 +640,7 @@ mod tests {
builder.append_value(22345678);
let array = builder.finish();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 1,\n 2,\n 3,\n], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
"RunArray {run_ends: [1, 2, 3], values: PrimitiveArray<UInt32>\n[\n 12345678,\n null,\n 22345678,\n]}\n",
format!("{array:?}")
);

Expand All @@ -685,7 +654,7 @@ mod tests {
assert_eq!(array.null_count(), 0);

assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 20,\n], values: PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
"RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n 1,\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -698,7 +667,7 @@ mod tests {
.map(|&x| if x == "b" { None } else { Some(x) })
.collect();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
"RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n null,\n \"c\",\n]}\n",
format!("{array:?}")
);

Expand All @@ -707,7 +676,7 @@ mod tests {

let array: RunArray<Int16Type> = test.into_iter().collect();
assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int16>\n[\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
"RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n \"a\",\n \"b\",\n \"c\",\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -721,8 +690,6 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(&DataType::Int16, run_ends.data_type());
assert_eq!(0, run_ends.null_count());
assert_eq!(&[1, 2, 3, 4], run_ends.values());
}

Expand All @@ -735,9 +702,6 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(&DataType::Int32, run_ends.data_type());
assert_eq!(0, run_ends.null_count());
assert_eq!(5, run_ends.len());
assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());

let values_data = array.values();
Expand All @@ -754,7 +718,7 @@ mod tests {
assert_eq!(array.null_count(), 0);

let run_ends = array.run_ends();
assert_eq!(1, run_ends.len());
assert_eq!(3, run_ends.len());
assert_eq!(&[3], run_ends.values());

let values_data = array.values();
Expand All @@ -770,16 +734,14 @@ mod tests {
[Some(1), Some(2), Some(3), Some(4)].into_iter().collect();

let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
assert_eq!(array.run_ends().data_type(), &DataType::Int32);
assert_eq!(array.values().data_type(), &DataType::Utf8);

assert_eq!(array.null_count(), 0);
assert_eq!(array.len(), 4);
assert_eq!(array.run_ends.null_count(), 0);
assert_eq!(array.values().null_count(), 1);

assert_eq!(
"RunArray {run_ends: PrimitiveArray<Int32>\n[\n 1,\n 2,\n 3,\n 4,\n], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
"RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n \"foo\",\n \"bar\",\n null,\n \"baz\",\n]}\n",
format!("{array:?}")
);
}
Expand All @@ -788,15 +750,15 @@ mod tests {
fn test_run_array_int16_type_definition() {
let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 3, 5]));
assert_eq!(array.run_ends().values(), &[2, 3, 5]);
assert_eq!(array.values(), &values);
}

#[test]
fn test_run_array_empty_string() {
let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
assert_eq!(array.run_ends(), &Int16Array::from(vec![2, 4, 5]));
assert_eq!(array.run_ends().values(), &[2, 4, 5]);
assert_eq!(array.values(), &values);
}

Expand Down Expand Up @@ -849,9 +811,7 @@ mod tests {
}

#[test]
#[should_panic(
expected = "PrimitiveArray expected ArrayData with type Int64 got Int32"
)]
#[should_panic(expected = "Incorrect run ends type")]
fn test_run_array_run_ends_data_type_mismatch() {
let a = RunArray::<Int32Type>::from_iter(["32"]);
let _ = RunArray::<Int64Type>::from(a.into_data());
Expand All @@ -874,7 +834,7 @@ mod tests {
let actual = typed.value(i);
assert_eq!(*val, actual)
} else {
let physical_ix = run_array.get_physical_index(i).unwrap();
let physical_ix = run_array.get_physical_index(i);
assert!(typed.values().is_null(physical_ix));
};
}
Expand Down
Loading

0 comments on commit 36f2db3

Please sign in to comment.