Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust, python): Add ddof option to rolling_var and rolling_std #8957

Merged
merged 20 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions polars/polars-arrow/src/kernels/rolling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ pub mod no_nulls;
pub mod nulls;
mod window;

use std::any::Any;
use std::cmp::Ordering;
use std::ops::{Add, AddAssign, Div, Mul, Sub, SubAssign};
use std::sync::Arc;

use arrow::array::PrimitiveArray;
use arrow::bitmap::{Bitmap, MutableBitmap};
Expand All @@ -20,6 +22,7 @@ type End = usize;
type Idx = usize;
type WindowSize = usize;
type Len = usize;
pub type DynArgs = Option<Arc<dyn Any + Sync + Send>>;

#[inline]
/// NaN will be smaller than every valid value
Expand Down Expand Up @@ -133,3 +136,9 @@ where
unsafe { buf.sort_by(|a, b| a.partial_cmp(b).unwrap_unchecked()) };
}
}

//Parameters allowed for rolling operations.
#[derive(Clone, Copy, Debug)]
pub struct RollingVarParams {
pub ddof: u8,
}
7 changes: 5 additions & 2 deletions polars/polars-arrow/src/kernels/rolling/no_nulls/mean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ impl<
T: NativeType + IsFloat + std::iter::Sum + AddAssign + SubAssign + Div<Output = T> + NumCast,
> RollingAggWindowNoNulls<'a, T> for MeanWindow<'a, T>
{
fn new(slice: &'a [T], start: usize, end: usize) -> Self {
fn new(slice: &'a [T], start: usize, end: usize, params: DynArgs) -> Self {
Self {
sum: SumWindow::new(slice, start, end),
sum: SumWindow::new(slice, start, end, params),
}
}

Expand All @@ -30,6 +30,7 @@ pub fn rolling_mean<T>(
min_periods: usize,
center: bool,
weights: Option<&[f64]>,
_params: DynArgs,
) -> ArrayRef
where
T: NativeType + Float + std::iter::Sum<T> + SubAssign + AddAssign + IsFloat,
Expand All @@ -40,12 +41,14 @@ where
window_size,
min_periods,
det_offsets_center,
None,
),
(false, None) => rolling_apply_agg_window::<MeanWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets,
None,
),
(true, Some(weights)) => {
let weights = no_nulls::coerce_weights(weights);
Expand Down
30 changes: 20 additions & 10 deletions polars/polars-arrow/src/kernels/rolling/no_nulls/min_max.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub struct SortedMinMax<'a, T: NativeType> {
}

impl<'a, T: NativeType> RollingAggWindowNoNulls<'a, T> for SortedMinMax<'a, T> {
fn new(slice: &'a [T], _start: usize, _end: usize) -> Self {
fn new(slice: &'a [T], _start: usize, _end: usize, _params: DynArgs) -> Self {
Self { slice }
}

Expand All @@ -26,7 +26,7 @@ pub struct MinWindow<'a, T: NativeType + PartialOrd + IsFloat> {
}

impl<'a, T: NativeType + IsFloat + PartialOrd> RollingAggWindowNoNulls<'a, T> for MinWindow<'a, T> {
fn new(slice: &'a [T], start: usize, end: usize) -> Self {
fn new(slice: &'a [T], start: usize, end: usize, _params: DynArgs) -> Self {
let min = *slice[start..end]
.iter()
.min_by(|a, b| compare_fn_nan_min(*a, *b))
Expand Down Expand Up @@ -150,7 +150,7 @@ pub struct MaxWindow<'a, T: NativeType> {
}

impl<'a, T: NativeType + IsFloat + PartialOrd> RollingAggWindowNoNulls<'a, T> for MaxWindow<'a, T> {
fn new(slice: &'a [T], start: usize, end: usize) -> Self {
fn new(slice: &'a [T], start: usize, end: usize, _params: DynArgs) -> Self {
let max = *slice[start..end]
.iter()
.max_by(|a, b| compare_fn_nan_max(*a, *b))
Expand Down Expand Up @@ -313,6 +313,7 @@ pub fn rolling_max<T>(
min_periods: usize,
center: bool,
weights: Option<&[f64]>,
_params: DynArgs,
) -> ArrayRef
where
T: NativeType + PartialOrd + IsFloat + Bounded + NumCast + Mul<Output = T>,
Expand All @@ -326,13 +327,15 @@ where
window_size,
min_periods,
det_offsets_center,
None,
)
} else {
rolling_apply_agg_window::<MaxWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets_center,
None,
)
}
}
Expand All @@ -343,13 +346,15 @@ where
window_size,
min_periods,
det_offsets,
None,
)
} else {
rolling_apply_agg_window::<MaxWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets,
None,
)
}
}
Expand Down Expand Up @@ -408,6 +413,7 @@ pub fn rolling_min<T>(
min_periods: usize,
center: bool,
weights: Option<&[f64]>,
_params: DynArgs,
) -> ArrayRef
where
T: NativeType + PartialOrd + NumCast + Mul<Output = T> + Bounded + IsFloat,
Expand All @@ -421,13 +427,15 @@ where
window_size,
min_periods,
det_offsets_center,
None,
)
} else {
rolling_apply_agg_window::<MinWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets_center,
None,
)
}
}
Expand All @@ -439,13 +447,15 @@ where
window_size,
min_periods,
det_offsets,
None,
)
} else {
rolling_apply_agg_window::<MinWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets,
None,
)
}
}
Expand Down Expand Up @@ -496,32 +506,32 @@ mod test {
fn test_rolling_min_max() {
let values = &[1.0f64, 5.0, 3.0, 4.0];

let out = rolling_min(values, 2, 2, false, None);
let out = rolling_min(values, 2, 2, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[None, Some(1.0), Some(3.0), Some(3.0)]);
let out = rolling_max(values, 2, 2, false, None);
let out = rolling_max(values, 2, 2, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[None, Some(5.0), Some(5.0), Some(4.0)]);

let out = rolling_min(values, 2, 1, false, None);
let out = rolling_min(values, 2, 1, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(1.0), Some(1.0), Some(3.0), Some(3.0)]);
let out = rolling_max(values, 2, 1, false, None);
let out = rolling_max(values, 2, 1, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(1.0), Some(5.0), Some(5.0), Some(4.0)]);

let out = rolling_max(values, 3, 1, false, None);
let out = rolling_max(values, 3, 1, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(1.0), Some(5.0), Some(5.0), Some(5.0)]);

// test nan handling.
let values = &[1.0, 2.0, 3.0, f64::nan(), 5.0, 6.0, 7.0];
let out = rolling_min(values, 3, 3, false, None);
let out = rolling_min(values, 3, 3, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
// we cannot compare nans, so we compare the string values
Expand All @@ -541,7 +551,7 @@ mod test {
)
);

let out = rolling_max(values, 3, 3, false, None);
let out = rolling_max(values, 3, 3, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(
Expand Down
5 changes: 3 additions & 2 deletions polars/polars-arrow/src/kernels/rolling/no_nulls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::*;
use crate::utils::CustomIterTools;

pub trait RollingAggWindowNoNulls<'a, T: NativeType> {
fn new(slice: &'a [T], start: usize, end: usize) -> Self;
fn new(slice: &'a [T], start: usize, end: usize, params: DynArgs) -> Self;

/// Update and recompute the window
/// # Safety
Expand All @@ -36,6 +36,7 @@ pub(super) fn rolling_apply_agg_window<'a, Agg, T, Fo>(
window_size: usize,
min_periods: usize,
det_offsets_fn: Fo,
params: DynArgs,
) -> ArrayRef
where
Fo: Fn(Idx, WindowSize, Len) -> (Start, End),
Expand All @@ -44,7 +45,7 @@ where
{
let len = values.len();
let (start, end) = det_offsets_fn(0, window_size, len);
let mut agg_window = Agg::new(values, start, end);
let mut agg_window = Agg::new(values, start, end, params);

let out = (0..len)
.map(|idx| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub fn rolling_median<T>(
min_periods: usize,
center: bool,
weights: Option<&[f64]>,
_params: DynArgs,
) -> ArrayRef
where
T: NativeType
Expand Down Expand Up @@ -378,15 +379,15 @@ mod test {
];

for interpol in interpol_options {
let out1 = rolling_min(values, 2, 2, false, None);
let out1 = rolling_min(values, 2, 2, false, None, None);
let out1 = out1.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out1 = out1.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
let out2 = rolling_quantile(values, 0.0, interpol, 2, 2, false, None);
let out2 = out2.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out2 = out2.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out1, out2);

let out1 = rolling_max(values, 2, 2, false, None);
let out1 = rolling_max(values, 2, 2, false, None, None);
let out1 = out1.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out1 = out1.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
let out2 = rolling_quantile(values, 1.0, interpol, 2, 2, false, None);
Expand Down
17 changes: 10 additions & 7 deletions polars/polars-arrow/src/kernels/rolling/no_nulls/sum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct SumWindow<'a, T> {
impl<'a, T: NativeType + IsFloat + std::iter::Sum + AddAssign + SubAssign>
RollingAggWindowNoNulls<'a, T> for SumWindow<'a, T>
{
fn new(slice: &'a [T], start: usize, end: usize) -> Self {
fn new(slice: &'a [T], start: usize, end: usize, _params: DynArgs) -> Self {
let sum = slice[start..end].iter().copied().sum::<T>();
Self {
slice,
Expand Down Expand Up @@ -73,6 +73,7 @@ pub fn rolling_sum<T>(
min_periods: usize,
center: bool,
weights: Option<&[f64]>,
_params: DynArgs,
) -> ArrayRef
where
T: NativeType + std::iter::Sum + NumCast + Mul<Output = T> + AddAssign + SubAssign + IsFloat,
Expand All @@ -83,12 +84,14 @@ where
window_size,
min_periods,
det_offsets_center,
None,
),
(false, None) => rolling_apply_agg_window::<SumWindow<_>, _, _>(
values,
window_size,
min_periods,
det_offsets,
None,
),
(true, Some(weights)) => {
let weights = no_nulls::coerce_weights(weights);
Expand Down Expand Up @@ -122,34 +125,34 @@ mod test {
fn test_rolling_sum() {
let values = &[1.0f64, 2.0, 3.0, 4.0];

let out = rolling_sum(values, 2, 2, false, None);
let out = rolling_sum(values, 2, 2, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[None, Some(3.0), Some(5.0), Some(7.0)]);

let out = rolling_sum(values, 2, 1, false, None);
let out = rolling_sum(values, 2, 1, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(1.0), Some(3.0), Some(5.0), Some(7.0)]);

let out = rolling_sum(values, 4, 1, false, None);
let out = rolling_sum(values, 4, 1, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(1.0), Some(3.0), Some(6.0), Some(10.0)]);

let out = rolling_sum(values, 4, 1, true, None);
let out = rolling_sum(values, 4, 1, true, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[Some(3.0), Some(6.0), Some(10.0), Some(9.0)]);

let out = rolling_sum(values, 4, 4, true, None);
let out = rolling_sum(values, 4, 4, true, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();
assert_eq!(out, &[None, None, Some(10.0), None]);

// test nan handling.
let values = &[1.0, 2.0, 3.0, f64::nan(), 5.0, 6.0, 7.0];
let out = rolling_sum(values, 3, 3, false, None);
let out = rolling_sum(values, 3, 3, false, None, None);
let out = out.as_any().downcast_ref::<PrimitiveArray<f64>>().unwrap();
let out = out.into_iter().map(|v| v.copied()).collect::<Vec<_>>();

Expand Down
Loading