Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix_ipc_nested_di…
Browse files Browse the repository at this point in the history
…ct_flight
  • Loading branch information
viirya committed May 8, 2022
2 parents 513389d + f74ad34 commit 3628b43
Show file tree
Hide file tree
Showing 34 changed files with 915 additions and 446 deletions.
5 changes: 3 additions & 2 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! Utilities to assist with reading and writing Arrow data as Flight messages

use crate::{FlightData, IpcMessage, SchemaAsIpc, SchemaResult};
use std::collections::HashMap;

use arrow::array::ArrayRef;
use arrow::datatypes::{Schema, SchemaRef};
Expand Down Expand Up @@ -49,7 +50,7 @@ pub fn flight_data_from_arrow_batch(
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
dictionaries_by_field: &[Option<ArrayRef>],
dictionaries_by_id: &HashMap<i64, ArrayRef>,
) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let message = arrow::ipc::root_as_message(&data.data_header[..]).map_err(|err| {
Expand All @@ -68,7 +69,7 @@ pub fn flight_data_to_arrow_batch(
&data.data_body,
batch,
schema,
dictionaries_by_field,
dictionaries_by_id,
None,
)
})?
Expand Down
17 changes: 11 additions & 6 deletions arrow/benches/string_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,27 @@ use arrow::array::*;
use arrow::compute::kernels::substring::substring;
use arrow::util::bench_util::*;

fn bench_substring(arr: &StringArray, start: i64, length: Option<u64>) {
fn bench_substring(arr: &dyn Array, start: i64, length: Option<u64>) {
substring(criterion::black_box(arr), start, length).unwrap();
}

fn add_benchmark(c: &mut Criterion) {
let size = 65536;
let str_len = 1000;
let val_len = 1000;

let arr_string = create_string_array_with_len::<i32>(size, 0.0, str_len);
let arr_string = create_string_array_with_len::<i32>(size, 0.0, val_len);
let arr_fsb = create_fsb_array(size, 0.0, val_len);

c.bench_function("substring (start = 0, length = None)", |b| {
c.bench_function("substring utf8 (start = 0, length = None)", |b| {
b.iter(|| bench_substring(&arr_string, 0, None))
});

c.bench_function("substring (start = 1, length = str_len - 1)", |b| {
b.iter(|| bench_substring(&arr_string, 1, Some((str_len - 1) as u64)))
c.bench_function("substring utf8 (start = 1, length = str_len - 1)", |b| {
b.iter(|| bench_substring(&arr_string, 1, Some((val_len - 1) as u64)))
});

c.bench_function("substring fixed size binary array", |b| {
b.iter(|| bench_substring(&arr_fsb, 1, Some((val_len - 1) as u64)))
});
}

Expand Down
22 changes: 19 additions & 3 deletions arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,7 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef {
)
})
}
DataType::Decimal(_, _) => {
unimplemented!("Creating null Decimal array not yet supported")
}
DataType::Decimal(_, _) => new_null_sized_decimal(data_type, length),
}
}

Expand Down Expand Up @@ -620,6 +618,24 @@ fn new_null_sized_array<T: ArrowPrimitiveType>(
})
}

#[inline]
fn new_null_sized_decimal(data_type: &DataType, length: usize) -> ArrayRef {
make_array(unsafe {
ArrayData::new_unchecked(
data_type.clone(),
length,
Some(length),
Some(MutableBuffer::new_null(length).into()),
0,
vec![Buffer::from(vec![
0u8;
length * std::mem::size_of::<i128>()
])],
vec![],
)
})
}

/// Creates a new array from two FFI pointers. Used to import arrays from the C Data Interface
/// # Safety
/// Assumes that these pointers represent valid C Data Interfaces, both in memory
Expand Down
58 changes: 24 additions & 34 deletions arrow/src/array/array_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,23 @@ use crate::error::{ArrowError, Result};
use crate::util::bit_util;
use crate::{buffer::MutableBuffer, datatypes::DataType};

/// Like [`OffsetSizeTrait`], but specialized for Binary.
/// This allow us to expose a constant datatype for the [`GenericBinaryArray`].
pub trait BinaryOffsetSizeTrait: OffsetSizeTrait {
const DATA_TYPE: DataType;
}

impl BinaryOffsetSizeTrait for i32 {
const DATA_TYPE: DataType = DataType::Binary;
}

impl BinaryOffsetSizeTrait for i64 {
const DATA_TYPE: DataType = DataType::LargeBinary;
}

/// See [`BinaryArray`] and [`LargeBinaryArray`] for storing
/// binary data.
pub struct GenericBinaryArray<OffsetSize: BinaryOffsetSizeTrait> {
pub struct GenericBinaryArray<OffsetSize: OffsetSizeTrait> {
data: ArrayData,
value_offsets: RawPtrBox<OffsetSize>,
value_data: RawPtrBox<u8>,
}

impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {
impl<OffsetSize: OffsetSizeTrait> GenericBinaryArray<OffsetSize> {
pub fn get_data_type() -> DataType {
if OffsetSize::is_large() {
DataType::LargeBinary
} else {
DataType::Binary
}
}

/// Returns the length for value at index `i`.
#[inline]
pub fn value_length(&self, i: usize) -> OffsetSize {
Expand Down Expand Up @@ -155,7 +149,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {
"BinaryArray can only be created from List<u8> arrays, mismatched data types."
);

let mut builder = ArrayData::builder(OffsetSize::DATA_TYPE)
let mut builder = ArrayData::builder(Self::get_data_type())
.len(v.len())
.add_buffer(v.data_ref().buffers()[0].clone())
.add_buffer(v.data_ref().child_data()[0].buffers()[0].clone());
Expand Down Expand Up @@ -195,7 +189,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {
assert!(!offsets.is_empty()); // wrote at least one
let actual_len = (offsets.len() / std::mem::size_of::<OffsetSize>()) - 1;

let array_data = ArrayData::builder(OffsetSize::DATA_TYPE)
let array_data = ArrayData::builder(Self::get_data_type())
.len(actual_len)
.add_buffer(offsets.into())
.add_buffer(values.into());
Expand Down Expand Up @@ -223,14 +217,14 @@ impl<OffsetSize: BinaryOffsetSizeTrait> GenericBinaryArray<OffsetSize> {
}
}

impl<'a, T: BinaryOffsetSizeTrait> GenericBinaryArray<T> {
impl<'a, T: OffsetSizeTrait> GenericBinaryArray<T> {
/// constructs a new iterator
pub fn iter(&'a self) -> GenericBinaryIter<'a, T> {
GenericBinaryIter::<'a, T>::new(self)
}
}

impl<OffsetSize: BinaryOffsetSizeTrait> fmt::Debug for GenericBinaryArray<OffsetSize> {
impl<OffsetSize: OffsetSizeTrait> fmt::Debug for GenericBinaryArray<OffsetSize> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let prefix = if OffsetSize::is_large() { "Large" } else { "" };

Expand All @@ -242,7 +236,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> fmt::Debug for GenericBinaryArray<Offset
}
}

impl<OffsetSize: BinaryOffsetSizeTrait> Array for GenericBinaryArray<OffsetSize> {
impl<OffsetSize: OffsetSizeTrait> Array for GenericBinaryArray<OffsetSize> {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -252,13 +246,11 @@ impl<OffsetSize: BinaryOffsetSizeTrait> Array for GenericBinaryArray<OffsetSize>
}
}

impl<OffsetSize: BinaryOffsetSizeTrait> From<ArrayData>
for GenericBinaryArray<OffsetSize>
{
impl<OffsetSize: OffsetSizeTrait> From<ArrayData> for GenericBinaryArray<OffsetSize> {
fn from(data: ArrayData) -> Self {
assert_eq!(
data.data_type(),
&<OffsetSize as BinaryOffsetSizeTrait>::DATA_TYPE,
&Self::get_data_type(),
"[Large]BinaryArray expects Datatype::[Large]Binary"
);
assert_eq!(
Expand All @@ -276,7 +268,7 @@ impl<OffsetSize: BinaryOffsetSizeTrait> From<ArrayData>
}
}

impl<Ptr, OffsetSize: BinaryOffsetSizeTrait> FromIterator<Option<Ptr>>
impl<Ptr, OffsetSize: OffsetSizeTrait> FromIterator<Option<Ptr>>
for GenericBinaryArray<OffsetSize>
where
Ptr: AsRef<[u8]>,
Expand Down Expand Up @@ -309,7 +301,7 @@ where

// calculate actual data_len, which may be different from the iterator's upper bound
let data_len = offsets.len() - 1;
let array_data = ArrayData::builder(OffsetSize::DATA_TYPE)
let array_data = ArrayData::builder(Self::get_data_type())
.len(data_len)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_buffer(Buffer::from_slice_ref(&values))
Expand Down Expand Up @@ -399,7 +391,7 @@ pub type BinaryArray = GenericBinaryArray<i32>;
///
pub type LargeBinaryArray = GenericBinaryArray<i64>;

impl<'a, T: BinaryOffsetSizeTrait> IntoIterator for &'a GenericBinaryArray<T> {
impl<'a, T: OffsetSizeTrait> IntoIterator for &'a GenericBinaryArray<T> {
type Item = Option<&'a [u8]>;
type IntoIter = GenericBinaryIter<'a, T>;

Expand All @@ -408,23 +400,21 @@ impl<'a, T: BinaryOffsetSizeTrait> IntoIterator for &'a GenericBinaryArray<T> {
}
}

impl<OffsetSize: BinaryOffsetSizeTrait> From<Vec<Option<&[u8]>>>
impl<OffsetSize: OffsetSizeTrait> From<Vec<Option<&[u8]>>>
for GenericBinaryArray<OffsetSize>
{
fn from(v: Vec<Option<&[u8]>>) -> Self {
Self::from_opt_vec(v)
}
}

impl<OffsetSize: BinaryOffsetSizeTrait> From<Vec<&[u8]>>
for GenericBinaryArray<OffsetSize>
{
impl<OffsetSize: OffsetSizeTrait> From<Vec<&[u8]>> for GenericBinaryArray<OffsetSize> {
fn from(v: Vec<&[u8]>) -> Self {
Self::from_iter_values(v)
}
}

impl<T: BinaryOffsetSizeTrait> From<GenericListArray<T>> for GenericBinaryArray<T> {
impl<T: OffsetSizeTrait> From<GenericListArray<T>> for GenericBinaryArray<T> {
fn from(v: GenericListArray<T>) -> Self {
Self::from_list(v)
}
Expand Down Expand Up @@ -1295,7 +1285,7 @@ mod tests {
}
}

fn test_generic_binary_array_from_opt_vec<T: BinaryOffsetSizeTrait>() {
fn test_generic_binary_array_from_opt_vec<T: OffsetSizeTrait>() {
let values: Vec<Option<&[u8]>> =
vec![Some(b"one"), Some(b"two"), None, Some(b""), Some(b"three")];
let array = GenericBinaryArray::<T>::from_opt_vec(values);
Expand Down
11 changes: 3 additions & 8 deletions arrow/src/array/array_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,11 @@ impl<OffsetSize: OffsetSizeTrait> GenericListArray<OffsetSize> {

#[inline]
fn get_type(data_type: &DataType) -> Option<&DataType> {
if OffsetSize::is_large() {
if let DataType::LargeList(child) = data_type {
match (OffsetSize::is_large(), data_type) {
(true, DataType::LargeList(child)) | (false, DataType::List(child)) => {
Some(child.data_type())
} else {
None
}
} else if let DataType::List(child) = data_type {
Some(child.data_type())
} else {
None
_ => None,
}
}

Expand Down
Loading

0 comments on commit 3628b43

Please sign in to comment.