Skip to content

Commit

Permalink
fix rolling groupby with negative windows (#4010)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jul 13, 2022
1 parent f794a07 commit 4ee4f7c
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 20 deletions.
22 changes: 22 additions & 0 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,26 @@ mod test {
},
);
}

#[test]
fn test_foo() {
let s = Series::new("a", (0..20i32).collect::<Vec<_>>());
let df = df![
"ints"=> s
]
.unwrap();
let (_, _, groups) = df
.groupby_rolling(
vec![],
&RollingGroupOptions {
index_column: "ints".into(),
period: Duration::parse("2i"),
offset: Duration::parse("-5i"),
closed_window: ClosedWindow::Both,
},
)
.unwrap();

dbg!(groups);
}
}
109 changes: 90 additions & 19 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub fn groupby_windows(
(groups, lower_bound, upper_bound)
}

// this assumes that the starting point is alwa
pub(crate) fn groupby_values_iter_full_lookbehind(
period: Duration,
offset: Duration,
Expand Down Expand Up @@ -193,6 +194,60 @@ pub(crate) fn groupby_values_iter_full_lookbehind(
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
pub(crate) fn groupby_values_iter_window_behind_t(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
) -> impl Iterator<Item = (IdxSize, IdxSize)> + TrustedLen + '_ {
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};

let mut lagging_offset = 0;
let mut last = i64::MIN;
time.iter().enumerate().map(move |(i, lower)| {
if *lower < last {
panic!("index column of 'groupby_rolling' must be sorted!")
}
last = *lower;
let lower = add(&offset, *lower);
let upper = add(&period, lower);

let b = Bounds::new(lower, upper);
if b.is_future(time[0], closed_window) {
(0, 0)
} else {
// find starting point of window
// we can start searching from lagging offset as that is the minimum boundary because data is sorted
// and every iteration this boundary shifts right
// we cannot use binary search as a window is not binary,
// it is false left from the window, true inside, and false right of the window
let mut count = 0;
for &t in &time[lagging_offset..] {
if b.is_member(t, closed_window) || lagging_offset + count == i {
break;
}
count += 1
}
if lagging_offset + count != i {
lagging_offset += count;
}

// Safety
// we just iterated over value i.
let slice = unsafe { time.get_unchecked(lagging_offset..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));

(lagging_offset as IdxSize, len as IdxSize)
}
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
pub(crate) fn groupby_values_iter_partial_lookbehind(
period: Duration,
Expand Down Expand Up @@ -333,30 +388,46 @@ pub fn groupby_values(

// we have a (partial) lookbehind window
if offset.negative {
// only lookbehind
if offset.nanoseconds() >= period.nanoseconds() {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
base_offset,
);
iter.map(|(offset, len)| [offset as IdxSize, len])
.collect_trusted::<Vec<_>>()
})
.collect::<Vec<_>>();
flatten(&vals, Some(time.len()))
// lookbehind
// window is within 2 periods length of t
// ------t---
// [------]
if offset.nanoseconds() < period.nanoseconds() * 2 {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
base_offset,
);
iter.map(|(offset, len)| [offset as IdxSize, len])
.collect_trusted::<Vec<_>>()
})
.collect::<Vec<_>>();
flatten(&vals, Some(time.len()))
}
// window is completely behind t and t itself is not a member
// ---------------t---
// [---]
else {
let iter =
groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu);
iter.map(|(offset, len)| [offset, len]).collect_trusted()
}
}
// partial lookbehind
// this one is still single threaded
// can make it parallel later, its a bit more complicated because the boundaries are unknown
// window is with -1 periods of t
// ----t---
// [---]
else {
let iter =
groupby_values_iter_partial_lookbehind(period, offset, time, closed_window, tu);
Expand Down
33 changes: 32 additions & 1 deletion py-polars/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,35 @@ def test_groupby_rolling_negative_offset_3914() -> None:
)
assert df.groupby_rolling(index_column="datetime", period="2d", offset="-4d").agg(
pl.count().alias("count")
)["count"].to_list() == [1, 1, 1, 1, 1]
)["count"].to_list() == [0, 0, 1, 2, 2]

df = pl.DataFrame(
{
"ints": range(0, 20),
}
)

assert df.groupby_rolling(index_column="ints", period="2i", offset="-5i",).agg(
[pl.col("ints").alias("matches")]
)["matches"].to_list() == [
[],
[],
[],
[0],
[0, 1],
[1, 2],
[2, 3],
[3, 4],
[4, 5],
[5, 6],
[6, 7],
[7, 8],
[8, 9],
[9, 10],
[10, 11],
[11, 12],
[12, 13],
[13, 14],
[14, 15],
[15, 16],
]

0 comments on commit 4ee4f7c

Please sign in to comment.