Skip to content

Commit

Permalink
fix(rust, python): groupby rolling with negative offset (#9428)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoGorelli authored Jun 20, 2023
1 parent db13403 commit d3779ae
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 49 deletions.
80 changes: 37 additions & 43 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ pub fn groupby_windows(
(groups, lower_bound, upper_bound)
}

// this assumes that the starting point is alwa
pub(crate) fn groupby_values_iter_full_lookbehind(
// this assumes that the given time point is the right endpoint of the window
pub(crate) fn groupby_values_iter_lookbehind(
period: Duration,
offset: Duration,
time: &[i64],
Expand All @@ -233,7 +233,7 @@ pub(crate) fn groupby_values_iter_full_lookbehind(
tz: Option<Tz>,
start_offset: usize,
) -> impl Iterator<Item = PolarsResult<(IdxSize, IdxSize)>> + TrustedLen + '_ {
debug_assert!(offset.duration_ns() >= period.duration_ns());
debug_assert!(offset.duration_ns() == period.duration_ns());
debug_assert!(offset.negative);
let add = match tu {
TimeUnit::Nanoseconds => Duration::add_ns,
Expand Down Expand Up @@ -465,8 +465,7 @@ pub(crate) fn groupby_values_iter<'a>(
offset.negative = !period.negative;
if offset.duration_ns() > 0 {
// t is at the right endpoint of the window
let iter =
groupby_values_iter_full_lookbehind(period, offset, time, closed_window, tu, tz, 0);
let iter = groupby_values_iter_lookbehind(period, offset, time, closed_window, tu, tz, 0);
Box::new(iter)
} else if closed_window == ClosedWindow::Right || closed_window == ClosedWindow::None {
// only lookahead
Expand Down Expand Up @@ -514,49 +513,44 @@ pub fn groupby_values(

// we have a (partial) lookbehind window
if offset.negative {
if offset.duration_ns() >= period.duration_ns() {
// lookbehind
// window is within 2 periods length of t
// lookbehind
if offset.duration_ns() == period.duration_ns() {
// t is right at the end of the window
// ------t---
// [------]
if offset.duration_ns() < period.duration_ns() * 2 {
POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_full_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
tz,
base_offset,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
}
POOL.install(|| {
let vals = thread_offsets
.par_iter()
.copied()
.map(|(base_offset, len)| {
let upper_bound = base_offset + len;
let iter = groupby_values_iter_lookbehind(
period,
offset,
&time[..upper_bound],
closed_window,
tu,
tz,
base_offset,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(flatten_par(&vals))
})
} else if ((offset.duration_ns() >= period.duration_ns())
&& matches!(closed_window, ClosedWindow::Left | ClosedWindow::None))
|| ((offset.duration_ns() > period.duration_ns())
&& matches!(closed_window, ClosedWindow::Right | ClosedWindow::Both))
{
// window is completely behind t and t itself is not a member
// ---------------t---
// [---]
else {
let iter = groupby_values_iter_window_behind_t(
period,
offset,
time,
closed_window,
tu,
tz,
);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
let iter =
groupby_values_iter_window_behind_t(period, offset, time, closed_window, tu, tz);
iter.map(|result| result.map(|(offset, len)| [offset, len]))
.collect::<PolarsResult<_>>()
}
// partial lookbehind
// this one is still single threaded
Expand Down
7 changes: 3 additions & 4 deletions polars/polars-time/src/windows/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,10 +709,9 @@ fn test_rolling_lookback() {
ClosedWindow::None,
] {
let offset = Duration::parse("-2h");
let g0 =
groupby_values_iter_full_lookbehind(period, offset, &dates, closed_window, tu, None, 0)
.collect::<PolarsResult<Vec<_>>>()
.unwrap();
let g0 = groupby_values_iter_lookbehind(period, offset, &dates, closed_window, tu, None, 0)
.collect::<PolarsResult<Vec<_>>>()
.unwrap();
let g1 =
groupby_values_iter_partial_lookbehind(period, offset, &dates, closed_window, tu, None)
.collect::<PolarsResult<Vec<_>>>()
Expand Down
51 changes: 49 additions & 2 deletions py-polars/polars/testing/parametric/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dataclasses import dataclass
from math import isfinite
from textwrap import dedent
from typing import TYPE_CHECKING, Any, Collection, Sequence
from typing import TYPE_CHECKING, Any, Collection, Sequence, overload

from hypothesis.errors import InvalidArgument, NonInteractiveExampleWarning
from hypothesis.strategies import (
Expand Down Expand Up @@ -41,11 +41,18 @@
)

if TYPE_CHECKING:
import sys

from hypothesis.strategies import DrawFn, SearchStrategy

from polars import LazyFrame
from polars.type_aliases import OneOrMoreDataTypes, PolarsDataType

if sys.version_info >= (3, 8):
from typing import Literal
else:
from typing_extensions import Literal


_time_units = list(DTYPE_TEMPORAL_UNITS)

Expand Down Expand Up @@ -444,11 +451,51 @@ def draw_series(draw: DrawFn) -> Series:
_failed_frame_init_msgs_: set[str] = set()


@overload
def dataframes(
cols: int | column | Sequence[column] | None = None,
*,
lazy: Literal[False] = ...,
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
min_size: int | None = 0,
max_size: int | None = MAX_DATA_SIZE,
chunked: bool | None = None,
include_cols: Sequence[column] | column | None = None,
null_probability: float | dict[str, float] = 0.0,
allow_infinities: bool = True,
allowed_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
excluded_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
) -> SearchStrategy[DataFrame]:
...


@overload
def dataframes(
cols: int | column | Sequence[column] | None = None,
*,
lazy: Literal[True],
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
min_size: int | None = 0,
max_size: int | None = MAX_DATA_SIZE,
chunked: bool | None = None,
include_cols: Sequence[column] | column | None = None,
null_probability: float | dict[str, float] = 0.0,
allow_infinities: bool = True,
allowed_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
excluded_dtypes: Collection[PolarsDataType] | PolarsDataType | None = None,
) -> SearchStrategy[LazyFrame]:
...


@defines_strategy()
def dataframes(
cols: int | column | Sequence[column] | None = None,
lazy: bool = False,
*,
lazy: bool = False,
min_cols: int | None = 0,
max_cols: int | None = MAX_COLS,
size: int | None = None,
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/testing/parametric/strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def between(draw: DrawFn, type_: type, min_: Any, max_: Any) -> Any:
min_value=timedelta(microseconds=-(2**46)),
max_value=timedelta(microseconds=(2**46) - 1),
)
strategy_closed = sampled_from(["left", "right", "both", "none"])
strategy_time_unit = sampled_from(["ns", "us", "ms"])


@composite
Expand Down
76 changes: 76 additions & 0 deletions py-polars/tests/parametric/test_groupby_rolling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING

import hypothesis.strategies as st
from hypothesis import given, reject

import polars as pl
from polars.testing import assert_frame_equal
from polars.testing.parametric.primitives import column, dataframes
from polars.testing.parametric.strategies import strategy_closed, strategy_time_unit
from polars.utils.convert import _timedelta_to_pl_duration

if TYPE_CHECKING:
from polars.type_aliases import ClosedInterval, TimeUnit


@given(
period=st.timedeltas(min_value=timedelta(microseconds=0)).map(
_timedelta_to_pl_duration
),
offset=st.timedeltas().map(_timedelta_to_pl_duration),
closed=strategy_closed,
data=st.data(),
time_unit=strategy_time_unit,
)
def test_groupby_rolling(
period: str,
offset: str,
closed: ClosedInterval,
data: st.DataObject,
time_unit: TimeUnit,
) -> None:
dataframe = data.draw(
dataframes(
[
column("ts", dtype=pl.Datetime(time_unit)),
column("value", dtype=pl.Int64),
],
)
)
df = dataframe.sort("ts").unique("ts")
try:
result = df.groupby_rolling(
"ts", period=period, offset=offset, closed=closed
).agg(pl.col("value"))
except pl.exceptions.PolarsPanicError as exc:
assert any( # noqa: PT017
msg in str(exc)
for msg in (
"attempt to multiply with overflow",
"attempt to add with overflow",
)
)
reject()

expected_dict: dict[str, list[object]] = {"ts": [], "value": []}
for ts, _ in df.iter_rows():
window = df.filter(
pl.col("ts").is_between(
pl.lit(ts, dtype=pl.Datetime(time_unit)).dt.offset_by(offset),
pl.lit(ts, dtype=pl.Datetime(time_unit))
.dt.offset_by(offset)
.dt.offset_by(period),
closed=closed,
)
)
value = window["value"].to_list()
expected_dict["ts"].append(ts)
expected_dict["value"].append(value)
expected = pl.DataFrame(expected_dict).select(
pl.col("ts").cast(pl.Datetime(time_unit)),
pl.col("value").cast(pl.List(pl.Int64)),
)
assert_frame_equal(result, expected)
90 changes: 90 additions & 0 deletions py-polars/tests/unit/operations/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,96 @@ def test_rolling_kernels_and_groupby_rolling(
assert_frame_equal(out1, out2)


@pytest.mark.parametrize(
("offset", "closed", "expected_values"),
[
pytest.param(
"-1d",
"left",
[[1], [1, 2], [2, 3], [3, 4]],
id="partial lookbehind, left",
),
pytest.param(
"-1d",
"right",
[[1, 2], [2, 3], [3, 4], [4]],
id="partial lookbehind, right",
),
pytest.param(
"-1d",
"both",
[[1, 2], [1, 2, 3], [2, 3, 4], [3, 4]],
id="partial lookbehind, both",
),
pytest.param(
"-1d",
"none",
[[1], [2], [3], [4]],
id="partial lookbehind, none",
),
pytest.param(
"-2d",
"left",
[[], [1], [1, 2], [2, 3]],
id="full lookbehind, left",
),
pytest.param(
"-3d",
"left",
[[], [], [1], [1, 2]],
id="full lookbehind, offset > period, left",
),
pytest.param(
"-3d",
"right",
[[], [1], [1, 2], [2, 3]],
id="full lookbehind, right",
),
pytest.param(
"-3d",
"both",
[[], [1], [1, 2], [1, 2, 3]],
id="full lookbehind, both",
),
pytest.param(
"-2d",
"none",
[[], [1], [2], [3]],
id="full lookbehind, none",
),
pytest.param(
"-3d",
"none",
[[], [], [1], [2]],
id="full lookbehind, offset > period, none",
),
],
)
def test_rolling_negative_offset(
offset: str, closed: ClosedInterval, expected_values: list[list[int]]
) -> None:
df = pl.DataFrame(
{
"ts": pl.date_range(
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
),
"value": [1, 2, 3, 4],
}
)
result = df.groupby_rolling("ts", period="2d", offset=offset, closed=closed).agg(
pl.col("value")
)
expected = pl.DataFrame(
{
"ts": pl.date_range(
datetime(2021, 1, 1), datetime(2021, 1, 4), "1d", eager=True
),
"value": expected_values,
}
)
assert_frame_equal(result, expected)


def test_rolling_skew() -> None:
s = pl.Series([1, 2, 3, 3, 2, 10, 8])
assert s.rolling_skew(window_size=4, bias=True).to_list() == pytest.approx(
Expand Down

0 comments on commit d3779ae

Please sign in to comment.