Skip to content

Commit

Permalink
Backport PR #43291: BUG: Fixes to FixedForwardWindowIndexer and Group…
Browse files Browse the repository at this point in the history
…byIndexer (#43267) (#44061)

Co-authored-by: DSM <[email protected]>
  • Loading branch information
simonjayhawkins and dsm054 authored Oct 16, 2021
1 parent 1438fa0 commit a326408
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 29 deletions.
1 change: 1 addition & 0 deletions doc/source/whatsnew/v1.3.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Fixed regressions

Bug fixes
~~~~~~~~~
- Fixed bug in :meth:`pandas.DataFrame.groupby.rolling` and :class:`pandas.api.indexers.FixedForwardWindowIndexer` leading to segfaults and window endpoints being mixed across groups (:issue:`43267`)
- Fixed bug in :meth:`.GroupBy.mean` with datetimelike values including ``NaT`` values returning incorrect results (:issue:`43132`)
- Fixed bug in :meth:`Series.aggregate` not passing the first ``args`` to the user supplied ``func`` in certain cases (:issue:`43357`)
- Fixed memory leaks in :meth:`Series.rolling.quantile` and :meth:`Series.rolling.median` (:issue:`43339`)
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
GroupbyIndexer
"""
window_indexer = GroupbyIndexer(
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
return window_indexer
47 changes: 26 additions & 21 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ def get_window_bounds(
)

start = np.arange(num_values, dtype="int64")
end_s = start[: -self.window_size] + self.window_size
end_e = np.full(self.window_size, num_values, dtype="int64")
end = np.concatenate([end_s, end_e])
end = start + self.window_size
if self.window_size:
end[-self.window_size :] = num_values

return start, end

Expand All @@ -279,8 +279,8 @@ class GroupbyIndexer(BaseIndexer):
def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
window_size: int | BaseIndexer = 0,
groupby_indices: dict | None = None,
window_indexer: type[BaseIndexer] = BaseIndexer,
indexer_kwargs: dict | None = None,
**kwargs,
Expand All @@ -292,9 +292,9 @@ def __init__(
np.ndarray of the index of the original object that we are performing
a chained groupby operation over. This index has been pre-sorted relative to
the groups
window_size : int
window_size : int or BaseIndexer
window size during the windowing operation
groupby_indicies : dict or None
groupby_indices : dict or None
dict of {group label: [positional index of rows belonging to the group]}
window_indexer : BaseIndexer
BaseIndexer class determining the start and end bounds of each group
Expand All @@ -303,11 +303,13 @@ def __init__(
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
self.groupby_indices = groupby_indices or {}
self.window_indexer = window_indexer
self.indexer_kwargs = indexer_kwargs or {}
self.indexer_kwargs = indexer_kwargs.copy() if indexer_kwargs else {}
super().__init__(
index_array, self.indexer_kwargs.pop("window_size", window_size), **kwargs
index_array=index_array,
window_size=self.indexer_kwargs.pop("window_size", window_size),
**kwargs,
)

@Appender(get_window_bounds_doc)
Expand All @@ -323,8 +325,8 @@ def get_window_bounds(
# 3) Append the window bounds in group order
start_arrays = []
end_arrays = []
window_indicies_start = 0
for key, indices in self.groupby_indicies.items():
window_indices_start = 0
for key, indices in self.groupby_indices.items():
index_array: np.ndarray | None

if self.index_array is not None:
Expand All @@ -341,18 +343,21 @@ def get_window_bounds(
)
start = start.astype(np.int64)
end = end.astype(np.int64)
# Cannot use groupby_indicies as they might not be monotonic with the object
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"
# Cannot use groupby_indices as they might not be monotonic with the object
# we're rolling over
window_indicies = np.arange(
window_indicies_start, window_indicies_start + len(indices)
window_indices = np.arange(
window_indices_start, window_indices_start + len(indices)
)
window_indicies_start += len(indices)
window_indices_start += len(indices)
# Extend as we'll be slicing window like [start, end)
window_indicies = np.append(
window_indicies, [window_indicies[-1] + 1]
).astype(np.int64)
start_arrays.append(window_indicies.take(ensure_platform_int(start)))
end_arrays.append(window_indicies.take(ensure_platform_int(end)))
window_indices = np.append(window_indices, [window_indices[-1] + 1]).astype(
np.int64, copy=False
)
start_arrays.append(window_indices.take(ensure_platform_int(start)))
end_arrays.append(window_indices.take(ensure_platform_int(end)))
start = np.concatenate(start_arrays)
end = np.concatenate(end_arrays)
return start, end
Expand Down
26 changes: 21 additions & 5 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ def __iter__(self):
center=self.center,
closed=self.closed,
)
# From get_window_bounds, those two should be equal in length of array
assert len(start) == len(end)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

for s, e in zip(start, end):
result = obj.iloc[slice(s, e)]
Expand Down Expand Up @@ -522,6 +524,10 @@ def calc(x):
center=self.center,
closed=self.closed,
)
assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

return func(x, start, end, min_periods)

with np.errstate(all="ignore"):
Expand Down Expand Up @@ -1402,6 +1408,11 @@ def cov_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -1441,6 +1452,11 @@ def corr_func(x, y):
center=self.center,
closed=self.closed,
)

assert len(start) == len(
end
), "these should be equal in length from get_window_bounds"

with np.errstate(all="ignore"):
mean_x_y = window_aggregations.roll_mean(
x_array * y_array, start, end, min_periods
Expand Down Expand Up @@ -2316,11 +2332,11 @@ def _get_window_indexer(self) -> GroupbyIndexer:
index_array = self._index_array
if isinstance(self.window, BaseIndexer):
rolling_indexer = type(self.window)
indexer_kwargs = self.window.__dict__
indexer_kwargs = self.window.__dict__.copy()
assert isinstance(indexer_kwargs, dict) # for mypy
# We'll be using the index of each group later
indexer_kwargs.pop("index_array", None)
window = 0
window = self.window
elif self._win_freq_i8 is not None:
rolling_indexer = VariableWindowIndexer
window = self._win_freq_i8
Expand All @@ -2330,7 +2346,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
window_indexer = GroupbyIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
groupby_indices=self._grouper.indices,
window_indexer=rolling_indexer,
indexer_kwargs=indexer_kwargs,
)
Expand Down
2 changes: 1 addition & 1 deletion pandas/tests/groupby/test_missing.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def test_min_count(func, min_count, value):
tm.assert_frame_equal(result, expected)


def test_indicies_with_missing():
def test_indices_with_missing():
# GH 9304
df = DataFrame({"a": [1, 1, np.nan], "b": [2, 3, 4], "c": [5, 6, 7]})
g = df.groupby(["a", "b"])
Expand Down
159 changes: 159 additions & 0 deletions pandas/tests/window/test_base_indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from pandas import (
DataFrame,
MultiIndex,
Series,
concat,
date_range,
)
import pandas._testing as tm
Expand All @@ -13,6 +15,7 @@
)
from pandas.core.window.indexers import (
ExpandingIndexer,
FixedWindowIndexer,
VariableOffsetWindowIndexer,
)

Expand Down Expand Up @@ -293,3 +296,159 @@ def get_window_bounds(self, num_values, min_periods, center, closed):
result = getattr(df.rolling(indexer), func)(*args)
expected = DataFrame({"values": values})
tm.assert_frame_equal(result, expected)


@pytest.mark.parametrize(
"indexer_class", [FixedWindowIndexer, FixedForwardWindowIndexer, ExpandingIndexer]
)
@pytest.mark.parametrize("window_size", [1, 2, 12])
@pytest.mark.parametrize(
"df_data",
[
{"a": [1, 1], "b": [0, 1]},
{"a": [1, 2], "b": [0, 1]},
{"a": [1] * 16, "b": [np.nan, 1, 2, np.nan] + list(range(4, 16))},
],
)
def test_indexers_are_reusable_after_groupby_rolling(
indexer_class, window_size, df_data
):
# GH 43267
df = DataFrame(df_data)
num_trials = 3
indexer = indexer_class(window_size=window_size)
original_window_size = indexer.window_size
for i in range(num_trials):
df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
assert indexer.window_size == original_window_size


@pytest.mark.parametrize(
"window_size, num_values, expected_start, expected_end",
[
(1, 1, [0], [1]),
(1, 2, [0, 1], [1, 2]),
(2, 1, [0], [1]),
(2, 2, [0, 1], [2, 2]),
(5, 12, range(12), list(range(5, 12)) + [12] * 5),
(12, 5, range(5), [5] * 5),
(0, 0, np.array([]), np.array([])),
(1, 0, np.array([]), np.array([])),
(0, 1, [0], [0]),
],
)
def test_fixed_forward_indexer_bounds(
window_size, num_values, expected_start, expected_end
):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
start, end = indexer.get_window_bounds(num_values=num_values)

tm.assert_numpy_array_equal(start, np.array(expected_start), check_dtype=False)
tm.assert_numpy_array_equal(end, np.array(expected_end), check_dtype=False)
assert len(start) == len(end)


@pytest.mark.parametrize(
"df, window_size, expected",
[
(
DataFrame({"b": [0, 1, 2], "a": [1, 2, 2]}),
2,
Series(
[0, 1.5, 2.0],
index=MultiIndex.from_arrays([[1, 2, 2], range(3)], names=["a", None]),
name="b",
dtype=np.float64,
),
),
(
DataFrame(
{
"b": [np.nan, 1, 2, np.nan] + list(range(4, 18)),
"a": [1] * 7 + [2] * 11,
"c": range(18),
}
),
12,
Series(
[
3.6,
3.6,
4.25,
5.0,
5.0,
5.5,
6.0,
12.0,
12.5,
13.0,
13.5,
14.0,
14.5,
15.0,
15.5,
16.0,
16.5,
17.0,
],
index=MultiIndex.from_arrays(
[[1] * 7 + [2] * 11, range(18)], names=["a", None]
),
name="b",
dtype=np.float64,
),
),
],
)
def test_rolling_groupby_with_fixed_forward_specific(df, window_size, expected):
# GH 43267
indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).mean()
tm.assert_series_equal(result, expected)


@pytest.mark.parametrize(
"group_keys",
[
(1,),
(1, 2),
(2, 1),
(1, 1, 2),
(1, 2, 1),
(1, 1, 2, 2),
(1, 2, 3, 2, 3),
(1, 1, 2) * 4,
(1, 2, 3) * 5,
],
)
@pytest.mark.parametrize("window_size", [1, 2, 3, 4, 5, 8, 20])
def test_rolling_groupby_with_fixed_forward_many(group_keys, window_size):
# GH 43267
df = DataFrame(
{
"a": np.array(list(group_keys)),
"b": np.arange(len(group_keys), dtype=np.float64) + 17,
"c": np.arange(len(group_keys), dtype=np.int64),
}
)

indexer = FixedForwardWindowIndexer(window_size=window_size)
result = df.groupby("a")["b"].rolling(window=indexer, min_periods=1).sum()
result.index.names = ["a", "c"]

groups = df.groupby("a")[["a", "b"]]
manual = concat(
[
g.assign(
b=[
g["b"].iloc[i : i + window_size].sum(min_count=1)
for i in range(len(g))
]
)
for _, g in groups
]
)
manual = manual.set_index(["a", "c"])["b"]

tm.assert_series_equal(result, manual)

0 comments on commit a326408

Please sign in to comment.