Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix rolling groupby with negative windows #4010

Merged
merged 1 commit into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,4 +653,26 @@ mod test {
},
);
}

#[test]
fn test_foo() {
let s = Series::new("a", (0..20i32).collect::<Vec<_>>());
let df = df![
"ints"=> s
]
.unwrap();
let (_, _, groups) = df
.groupby_rolling(
vec![],
&RollingGroupOptions {
index_column: "ints".into(),
period: Duration::parse("2i"),
offset: Duration::parse("-5i"),
closed_window: ClosedWindow::Both,
},
)
.unwrap();

dbg!(groups);
}
}
109 changes: 90 additions & 19 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub fn groupby_windows(
(groups, lower_bound, upper_bound)
}

// this assumes that the starting point is alwa
pub(crate) fn groupby_values_iter_full_lookbehind(
period: Duration,
offset: Duration,
Expand Down Expand Up @@ -193,6 +194,60 @@ pub(crate) fn groupby_values_iter_full_lookbehind(
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
pub(crate) fn groupby_values_iter_window_behind_t(
period: Duration,
offset: Duration,
time: &[i64],
closed_window: ClosedWindow,
tu: TimeUnit,
) -> impl Iterator<Item = (IdxSize, IdxSize)> + TrustedLen + '_ {
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
TimeUnit::Microseconds => Duration::add_us,
TimeUnit::Milliseconds => Duration::add_ms,
};

let mut lagging_offset = 0;
let mut last = i64::MIN;
time.iter().enumerate().map(move |(i, lower)| {
if *lower < last {
panic!("index column of 'groupby_rolling' must be sorted!")
}
last = *lower;
let lower = add(&offset, *lower);
let upper = add(&period, lower);

let b = Bounds::new(lower, upper);
if b.is_future(time[0], closed_window) {
(0, 0)
} else {
// find starting point of window
// we can start searching from lagging offset as that is the minimum boundary because data is sorted
// and every iteration this boundary shifts right
// we cannot use binary search as a window is not binary,
// it is false left from the window, true inside, and false right of the window
let mut count = 0;
for &t in &time[lagging_offset..] {
if b.is_member(t, closed_window) || lagging_offset + count == i {
break;
}
count += 1
}
if lagging_offset + count != i {
lagging_offset += count;
}

// Safety
// we just iterated over value i.
let slice = unsafe { time.get_unchecked(lagging_offset..) };
let len = slice.partition_point(|v| b.is_member(*v, closed_window));

(lagging_offset as IdxSize, len as IdxSize)
}
})
}

// this one is correct for all lookbehind/lookaheads, but is slower
pub(crate) fn groupby_values_iter_partial_lookbehind(
period: Duration,
Expand Down Expand Up @@ -333,30 +388,46 @@ pub fn groupby_values(

// we have a (partial) lookbehind window
if offset.negative {
// only lookbehind
if offset.nanoseconds() >= period.nanoseconds() {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
base_offset,
);
iter.map(|(offset, len)| [offset as IdxSize, len])
.collect_trusted::<Vec<_>>()
})
.collect::<Vec<_>>();
flatten(&vals, Some(time.len()))
// lookbehind
// window is within 2 periods length of t
// ------t---
// [------]
if offset.nanoseconds() < period.nanoseconds() * 2 {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
base_offset,
);
iter.map(|(offset, len)| [offset as IdxSize, len])
.collect_trusted::<Vec<_>>()
})
.collect::<Vec<_>>();
flatten(&vals, Some(time.len()))
}
// window is completely behind t and t itself is not a member
// ---------------t---
// [---]
else {
let iter =
groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu);
iter.map(|(offset, len)| [offset, len]).collect_trusted()
}
}
// partial lookbehind
// this one is still single threaded
// can make it parallel later, its a bit more complicated because the boundaries are unknown
// window is with -1 periods of t
// ----t---
// [---]
else {
let iter =
groupby_values_iter_partial_lookbehind(period, offset, time, closed_window, tu);
Expand Down
33 changes: 32 additions & 1 deletion py-polars/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,35 @@ def test_groupby_rolling_negative_offset_3914() -> None:
)
assert df.groupby_rolling(index_column="datetime", period="2d", offset="-4d").agg(
pl.count().alias("count")
)["count"].to_list() == [1, 1, 1, 1, 1]
)["count"].to_list() == [0, 0, 1, 2, 2]

df = pl.DataFrame(
{
"ints": range(0, 20),
}
)

assert df.groupby_rolling(index_column="ints", period="2i", offset="-5i",).agg(
[pl.col("ints").alias("matches")]
)["matches"].to_list() == [
[],
[],
[],
[0],
[0, 1],
[1, 2],
[2, 3],
[3, 4],
[4, 5],
[5, 6],
[6, 7],
[7, 8],
[8, 9],
[9, 10],
[10, 11],
[11, 12],
[12, 13],
[13, 14],
[14, 15],
[15, 16],
]