diff --git a/polars/polars-time/src/groupby/dynamic.rs b/polars/polars-time/src/groupby/dynamic.rs index b955a2d23131..c0992164d2c8 100644 --- a/polars/polars-time/src/groupby/dynamic.rs +++ b/polars/polars-time/src/groupby/dynamic.rs @@ -653,4 +653,26 @@ mod test { }, ); } + + #[test] + fn test_foo() { + let s = Series::new("a", (0..20i32).collect::>()); + let df = df![ + "ints"=> s + ] + .unwrap(); + let (_, _, groups) = df + .groupby_rolling( + vec![], + &RollingGroupOptions { + index_column: "ints".into(), + period: Duration::parse("2i"), + offset: Duration::parse("-5i"), + closed_window: ClosedWindow::Both, + }, + ) + .unwrap(); + + dbg!(groups); + } } diff --git a/polars/polars-time/src/windows/groupby.rs b/polars/polars-time/src/windows/groupby.rs index c67727c14b5e..1be8d7299182 100644 --- a/polars/polars-time/src/windows/groupby.rs +++ b/polars/polars-time/src/windows/groupby.rs @@ -132,6 +132,7 @@ pub fn groupby_windows( (groups, lower_bound, upper_bound) } +// this assumes that the starting point is alwa pub(crate) fn groupby_values_iter_full_lookbehind( period: Duration, offset: Duration, @@ -193,6 +194,60 @@ pub(crate) fn groupby_values_iter_full_lookbehind( }) } +// this one is correct for all lookbehind/lookaheads, but is slower +pub(crate) fn groupby_values_iter_window_behind_t( + period: Duration, + offset: Duration, + time: &[i64], + closed_window: ClosedWindow, + tu: TimeUnit, +) -> impl Iterator + TrustedLen + '_ { + let add = match tu { + TimeUnit::Nanoseconds => Duration::add_ns, + TimeUnit::Microseconds => Duration::add_us, + TimeUnit::Milliseconds => Duration::add_ms, + }; + + let mut lagging_offset = 0; + let mut last = i64::MIN; + time.iter().enumerate().map(move |(i, lower)| { + if *lower < last { + panic!("index column of 'groupby_rolling' must be sorted!") + } + last = *lower; + let lower = add(&offset, *lower); + let upper = add(&period, lower); + + let b = Bounds::new(lower, upper); + if b.is_future(time[0], closed_window) { + (0, 0) + } else { + // find starting point of window + // we can start searching from lagging offset as that is the minimum boundary because data is sorted + // and every iteration this boundary shifts right + // we cannot use binary search as a window is not binary, + // it is false left from the window, true inside, and false right of the window + let mut count = 0; + for &t in &time[lagging_offset..] { + if b.is_member(t, closed_window) || lagging_offset + count == i { + break; + } + count += 1 + } + if lagging_offset + count != i { + lagging_offset += count; + } + + // Safety + // we just iterated over value i. + let slice = unsafe { time.get_unchecked(lagging_offset..) }; + let len = slice.partition_point(|v| b.is_member(*v, closed_window)); + + (lagging_offset as IdxSize, len as IdxSize) + } + }) +} + // this one is correct for all lookbehind/lookaheads, but is slower pub(crate) fn groupby_values_iter_partial_lookbehind( period: Duration, @@ -333,30 +388,46 @@ pub fn groupby_values( // we have a (partial) lookbehind window if offset.negative { - // only lookbehind if offset.nanoseconds() >= period.nanoseconds() { - let vals = thread_offsets - .par_iter() - .copied() - .map(|(base_offset, len)| { - let upper_bound = base_offset + len; - let iter = groupby_values_iter_full_lookbehind( - period, - offset, - &time[..upper_bound], - closed_window, - tu, - base_offset, - ); - iter.map(|(offset, len)| [offset as IdxSize, len]) - .collect_trusted::>() - }) - .collect::>(); - flatten(&vals, Some(time.len())) + // lookbehind + // window is within 2 periods length of t + // ------t--- + // [------] + if offset.nanoseconds() < period.nanoseconds() * 2 { + let vals = thread_offsets + .par_iter() + .copied() + .map(|(base_offset, len)| { + let upper_bound = base_offset + len; + let iter = groupby_values_iter_full_lookbehind( + period, + offset, + &time[..upper_bound], + closed_window, + tu, + base_offset, + ); + iter.map(|(offset, len)| [offset as IdxSize, len]) + .collect_trusted::>() + }) + .collect::>(); + flatten(&vals, Some(time.len())) + } + // window is completely behind t and t itself is not a member + // ---------------t--- + // [---] + else { + let iter = + groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu); + iter.map(|(offset, len)| [offset, len]).collect_trusted() + } } // partial lookbehind // this one is still single threaded // can make it parallel later, its a bit more complicated because the boundaries are unknown + // window is with -1 periods of t + // ----t--- + // [---] else { let iter = groupby_values_iter_partial_lookbehind(period, offset, time, closed_window, tu); diff --git a/py-polars/tests/test_groupby.py b/py-polars/tests/test_groupby.py index e07fcea7cf20..078270230563 100644 --- a/py-polars/tests/test_groupby.py +++ b/py-polars/tests/test_groupby.py @@ -65,4 +65,35 @@ def test_groupby_rolling_negative_offset_3914() -> None: ) assert df.groupby_rolling(index_column="datetime", period="2d", offset="-4d").agg( pl.count().alias("count") - )["count"].to_list() == [1, 1, 1, 1, 1] + )["count"].to_list() == [0, 0, 1, 2, 2] + + df = pl.DataFrame( + { + "ints": range(0, 20), + } + ) + + assert df.groupby_rolling(index_column="ints", period="2i", offset="-5i",).agg( + [pl.col("ints").alias("matches")] + )["matches"].to_list() == [ + [], + [], + [], + [0], + [0, 1], + [1, 2], + [2, 3], + [3, 4], + [4, 5], + [5, 6], + [6, 7], + [7, 8], + [8, 9], + [9, 10], + [10, 11], + [11, 12], + [12, 13], + [13, 14], + [14, 15], + [15, 16], + ]