Skip to content

Commit

Permalink
perf: Speedup writing of Parquet primitive values (#18020)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Aug 4, 2024
1 parent 6259ea4 commit 8adadf6
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 89 deletions.
64 changes: 44 additions & 20 deletions crates/polars-compute/src/min_max/dyn_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,55 @@ macro_rules! call_op {
$op(arr)
.map(|v| Box::new(<$scalar>::new(arr.data_type().clone(), Some(v))) as Box<dyn Scalar>)
}};
($T:ty, $scalar:ty, $arr:expr, $op:path, ret_two) => {{
let arr: &$T = $arr.as_any().downcast_ref().unwrap();
$op(arr).map(|(l, r)| {
(
Box::new(<$scalar>::new(Some(l))) as Box<dyn Scalar>,
Box::new(<$scalar>::new(Some(r))) as Box<dyn Scalar>,
)
})
}};
(dt: $T:ty, $scalar:ty, $arr:expr, $op:path, ret_two) => {{
let arr: &$T = $arr.as_any().downcast_ref().unwrap();
$op(arr).map(|(l, r)| {
(
Box::new(<$scalar>::new(arr.data_type().clone(), Some(l))) as Box<dyn Scalar>,
Box::new(<$scalar>::new(arr.data_type().clone(), Some(r))) as Box<dyn Scalar>,
)
})
}};
}

macro_rules! call {
($arr:expr, $op:path) => {{
($arr:expr, $op:path$(, $variant:ident)?) => {{
let arr = $arr;

use arrow::datatypes::{PhysicalType as PH, PrimitiveType as PR};
use PrimitiveArray as PArr;
use PrimitiveScalar as PScalar;
match arr.data_type().to_physical_type() {
PH::Boolean => call_op!(BooleanArray, BooleanScalar, arr, $op),
PH::Primitive(PR::Int8) => call_op!(dt: PArr<i8>, PScalar<i8>, arr, $op),
PH::Primitive(PR::Int16) => call_op!(dt: PArr<i16>, PScalar<i16>, arr, $op),
PH::Primitive(PR::Int32) => call_op!(dt: PArr<i32>, PScalar<i32>, arr, $op),
PH::Primitive(PR::Int64) => call_op!(dt: PArr<i64>, PScalar<i64>, arr, $op),
PH::Primitive(PR::Int128) => call_op!(dt: PArr<i128>, PScalar<i128>, arr, $op),
PH::Primitive(PR::UInt8) => call_op!(dt: PArr<u8>, PScalar<u8>, arr, $op),
PH::Primitive(PR::UInt16) => call_op!(dt: PArr<u16>, PScalar<u16>, arr, $op),
PH::Primitive(PR::UInt32) => call_op!(dt: PArr<u32>, PScalar<u32>, arr, $op),
PH::Primitive(PR::UInt64) => call_op!(dt: PArr<u64>, PScalar<u64>, arr, $op),
PH::Primitive(PR::UInt128) => call_op!(dt: PArr<u128>, PScalar<u128>, arr, $op),
PH::Primitive(PR::Float32) => call_op!(dt: PArr<f32>, PScalar<f32>, arr, $op),
PH::Primitive(PR::Float64) => call_op!(dt: PArr<f64>, PScalar<f64>, arr, $op),
PH::Boolean => call_op!(BooleanArray, BooleanScalar, arr, $op$(, $variant)?),
PH::Primitive(PR::Int8) => call_op!(dt: PArr<i8>, PScalar<i8>, arr, $op$(, $variant)?),
PH::Primitive(PR::Int16) => call_op!(dt: PArr<i16>, PScalar<i16>, arr, $op$(, $variant)?),
PH::Primitive(PR::Int32) => call_op!(dt: PArr<i32>, PScalar<i32>, arr, $op$(, $variant)?),
PH::Primitive(PR::Int64) => call_op!(dt: PArr<i64>, PScalar<i64>, arr, $op$(, $variant)?),
PH::Primitive(PR::Int128) => call_op!(dt: PArr<i128>, PScalar<i128>, arr, $op$(, $variant)?),
PH::Primitive(PR::UInt8) => call_op!(dt: PArr<u8>, PScalar<u8>, arr, $op$(, $variant)?),
PH::Primitive(PR::UInt16) => call_op!(dt: PArr<u16>, PScalar<u16>, arr, $op$(, $variant)?),
PH::Primitive(PR::UInt32) => call_op!(dt: PArr<u32>, PScalar<u32>, arr, $op$(, $variant)?),
PH::Primitive(PR::UInt64) => call_op!(dt: PArr<u64>, PScalar<u64>, arr, $op$(, $variant)?),
PH::Primitive(PR::UInt128) => call_op!(dt: PArr<u128>, PScalar<u128>, arr, $op$(, $variant)?),
PH::Primitive(PR::Float32) => call_op!(dt: PArr<f32>, PScalar<f32>, arr, $op$(, $variant)?),
PH::Primitive(PR::Float64) => call_op!(dt: PArr<f64>, PScalar<f64>, arr, $op$(, $variant)?),

PH::BinaryView => call_op!(BinaryViewArray, BinaryViewScalar<[u8]>, arr, $op),
PH::Utf8View => call_op!(Utf8ViewArray, BinaryViewScalar<str>, arr, $op),
PH::BinaryView => call_op!(BinaryViewArray, BinaryViewScalar<[u8]>, arr, $op$(, $variant)?),
PH::Utf8View => call_op!(Utf8ViewArray, BinaryViewScalar<str>, arr, $op$(, $variant)?),

PH::Binary => call_op!(BinaryArray<i32>, BinaryScalar<i32>, arr, $op),
PH::LargeBinary => call_op!(BinaryArray<i64>, BinaryScalar<i64>, arr, $op),
PH::Utf8 => call_op!(Utf8Array<i32>, BinaryScalar<i32>, arr, $op),
PH::LargeUtf8 => call_op!(Utf8Array<i64>, BinaryScalar<i64>, arr, $op),
PH::Binary => call_op!(BinaryArray<i32>, BinaryScalar<i32>, arr, $op$(, $variant)?),
PH::LargeBinary => call_op!(BinaryArray<i64>, BinaryScalar<i64>, arr, $op$(, $variant)?),
PH::Utf8 => call_op!(Utf8Array<i32>, BinaryScalar<i32>, arr, $op$(, $variant)?),
PH::LargeUtf8 => call_op!(Utf8Array<i64>, BinaryScalar<i64>, arr, $op$(, $variant)?),

_ => todo!("Dynamic MinMax is not yet implemented for {:?}", arr.data_type()),
}
Expand All @@ -67,3 +85,9 @@ pub fn dyn_array_min_propagate_nan(arr: &dyn Array) -> Option<Box<dyn Scalar>> {
pub fn dyn_array_max_propagate_nan(arr: &dyn Array) -> Option<Box<dyn Scalar>> {
call!(arr, MinMaxKernel::max_propagate_nan_kernel)
}

pub fn dyn_array_min_max_propagate_nan(
arr: &dyn Array,
) -> Option<(Box<dyn Scalar>, Box<dyn Scalar>)> {
call!(arr, MinMaxKernel::min_max_propagate_nan_kernel, ret_two)
}
2 changes: 1 addition & 1 deletion crates/polars-compute/src/min_max/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use polars_utils::min_max::MinMax;

pub use self::dyn_array::{
dyn_array_max_ignore_nan, dyn_array_max_propagate_nan, dyn_array_min_ignore_nan,
dyn_array_min_propagate_nan,
dyn_array_min_max_propagate_nan, dyn_array_min_propagate_nan,
};

/// Low-level min/max kernel.
Expand Down
48 changes: 48 additions & 0 deletions crates/polars-compute/src/min_max/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ use polars_utils::min_max::MinMax;

use super::MinMaxKernel;

fn min_max_ignore_nan<T: NativeType>((cur_min, cur_max): (T, T), (min, max): (T, T)) -> (T, T) {
(
MinMax::min_ignore_nan(cur_min, min),
MinMax::max_ignore_nan(cur_max, max),
)
}

fn min_max_propagate_nan<T: NativeType>((cur_min, cur_max): (T, T), (min, max): (T, T)) -> (T, T) {
(
MinMax::min_propagate_nan(cur_min, min),
MinMax::max_propagate_nan(cur_max, max),
)
}

fn reduce_vals<T, F>(v: &PrimitiveArray<T>, f: F) -> Option<T>
where
T: NativeType,
Expand All @@ -18,6 +32,18 @@ where
}
}

fn reduce_tuple_vals<T, F>(v: &PrimitiveArray<T>, f: F) -> Option<(T, T)>
where
T: NativeType,
F: Fn((T, T), (T, T)) -> (T, T),
{
if v.null_count() == 0 {
v.values_iter().copied().map(|v| (v, v)).reduce(f)
} else {
v.non_null_values_iter().map(|v| (v, v)).reduce(f)
}
}

impl<T: NativeType + MinMax + super::NotSimdPrimitive> MinMaxKernel for PrimitiveArray<T> {
type Scalar<'a> = T;

Expand All @@ -29,13 +55,21 @@ impl<T: NativeType + MinMax + super::NotSimdPrimitive> MinMaxKernel for Primitiv
reduce_vals(self, MinMax::max_ignore_nan)
}

fn min_max_ignore_nan_kernel(&self) -> Option<(Self::Scalar<'_>, Self::Scalar<'_>)> {
reduce_tuple_vals(self, min_max_ignore_nan)
}

fn min_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
reduce_vals(self, MinMax::min_propagate_nan)
}

fn max_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
reduce_vals(self, MinMax::max_propagate_nan)
}

fn min_max_propagate_nan_kernel(&self) -> Option<(Self::Scalar<'_>, Self::Scalar<'_>)> {
reduce_tuple_vals(self, min_max_propagate_nan)
}
}

impl<T: NativeType + MinMax + super::NotSimdPrimitive> MinMaxKernel for [T] {
Expand All @@ -49,13 +83,27 @@ impl<T: NativeType + MinMax + super::NotSimdPrimitive> MinMaxKernel for [T] {
self.iter().copied().reduce(MinMax::max_ignore_nan)
}

fn min_max_ignore_nan_kernel(&self) -> Option<(Self::Scalar<'_>, Self::Scalar<'_>)> {
self.iter()
.copied()
.map(|v| (v, v))
.reduce(min_max_ignore_nan)
}

fn min_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
self.iter().copied().reduce(MinMax::min_propagate_nan)
}

fn max_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
self.iter().copied().reduce(MinMax::max_propagate_nan)
}

fn min_max_propagate_nan_kernel(&self) -> Option<(Self::Scalar<'_>, Self::Scalar<'_>)> {
self.iter()
.copied()
.map(|v| (v, v))
.reduce(min_max_propagate_nan)
}
}

impl MinMaxKernel for BooleanArray {
Expand Down
Loading

0 comments on commit 8adadf6

Please sign in to comment.