From 6ac16a0fd34781523b8ba53532e930999dde2a47 Mon Sep 17 00:00:00 2001 From: Phillip Cloud <417981+cpcloud@users.noreply.github.com> Date: Wed, 11 Oct 2023 14:14:11 -0400 Subject: [PATCH] chore(pandas): fix implementation to handle new zero-argument modeling --- .../dask/tests/execution/test_window.py | 4 +- ibis/backends/pandas/execution/window.py | 51 +++++++++++-------- ibis/backends/tests/test_window.py | 18 +++---- 3 files changed, 39 insertions(+), 34 deletions(-) diff --git a/ibis/backends/dask/tests/execution/test_window.py b/ibis/backends/dask/tests/execution/test_window.py index 0daf70175c165..127af1a87bf3f 100644 --- a/ibis/backends/dask/tests/execution/test_window.py +++ b/ibis/backends/dask/tests/execution/test_window.py @@ -200,8 +200,8 @@ def test_batting_avg_change_in_games_per_year(players, players_df): @pytest.mark.xfail( - raises=NotImplementedError, - reason="Grouped and order windows not supported yet", + raises=AssertionError, + reason="Dask doesn't support the `rank` method on SeriesGroupBy", ) def test_batting_most_hits(players, players_df): expr = players.mutate( diff --git a/ibis/backends/pandas/execution/window.py b/ibis/backends/pandas/execution/window.py index 4a816235d1622..39475ecc2bb6c 100644 --- a/ibis/backends/pandas/execution/window.py +++ b/ibis/backends/pandas/execution/window.py @@ -254,10 +254,10 @@ def trim_window_result(data: pd.Series | pd.DataFrame, timecontext: TimeContext return indexed_subset[name] -@execute_node.register(ops.WindowFunction, pd.Series) +@execute_node.register(ops.WindowFunction, [pd.Series]) def execute_window_op( op, - data, + *data, scope: Scope | None = None, timecontext: TimeContext | None = None, aggcontext=None, @@ -485,33 +485,42 @@ def execute_series_group_by_last_value(op, data, aggcontext=None, **kwargs): return aggcontext.agg(data, lambda x: _getter(x, -1)) -@execute_node.register(ops.MinRank, (pd.Series, SeriesGroupBy)) -def execute_series_min_rank(op, data, **kwargs): - # TODO(phillipc): Handle ORDER BY +@execute_node.register(ops.MinRank) +def execute_series_min_rank(op, aggcontext=None, **kwargs): + (key,) = aggcontext.order_by + df = aggcontext.parent + data = df[key] return data.rank(method="min", ascending=True).astype("int64") - 1 -@execute_node.register(ops.DenseRank, (pd.Series, SeriesGroupBy)) -def execute_series_dense_rank(op, data, **kwargs): - # TODO(phillipc): Handle ORDER BY +@execute_node.register(ops.DenseRank) +def execute_series_dense_rank(op, aggcontext=None, **kwargs): + (key,) = aggcontext.order_by + df = aggcontext.parent + data = df[key] return data.rank(method="dense", ascending=True).astype("int64") - 1 -@execute_node.register(ops.PercentRank, SeriesGroupBy) -def execute_series_group_by_percent_rank(op, data, **kwargs): - return ( - data.rank(method="min", ascending=True) - .sub(1) - .div(data.transform("count").sub(1)) - ) +@execute_node.register(ops.PercentRank) +def execute_series_group_by_percent_rank(op, aggcontext=None, **kwargs): + (key,) = aggcontext.order_by + df = aggcontext.parent + data = df[key] + + result = data.rank(method="min", ascending=True) - 1 + if isinstance(data, SeriesGroupBy): + nrows = data.transform("count") + else: + nrows = len(data) -@execute_node.register(ops.PercentRank, pd.Series) -def execute_series_percent_rank(op, data, **kwargs): - # TODO(phillipc): Handle ORDER BY - return data.rank(method="min", ascending=True).sub(1).div(len(data) - 1) + result /= nrows - 1 + return result -@execute_node.register(ops.CumeDist, (pd.Series, SeriesGroupBy)) -def execute_series_group_by_cume_dist(op, data, **kwargs): +@execute_node.register(ops.CumeDist) +def execute_series_group_by_cume_dist(op, aggcontext=None, **kwargs): + (key,) = aggcontext.order_by + df = aggcontext.parent + data = df[key] return data.rank(method="min", ascending=True, pct=True) diff --git a/ibis/backends/tests/test_window.py b/ibis/backends/tests/test_window.py index d8d7fadc1cf95..3f282f3e0d510 100644 --- a/ibis/backends/tests/test_window.py +++ b/ibis/backends/tests/test_window.py @@ -80,13 +80,13 @@ def calc_zscore(s): lambda t, win: t.id.rank().over(win), lambda t: t.id.rank(method="min").astype("int64") - 1, id="rank", - marks=[pytest.mark.notimpl(["dask"], raises=NotImplementedError)], + marks=[pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError)], ), param( lambda t, win: t.id.dense_rank().over(win), lambda t: t.id.rank(method="dense").astype("int64") - 1, id="dense_rank", - marks=[pytest.mark.notimpl(["dask"], raises=NotImplementedError)], + marks=[pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError)], ), param( lambda t, win: t.id.percent_rank().over(win), @@ -102,7 +102,7 @@ def calc_zscore(s): reason="clickhouse doesn't implement percent_rank", raises=com.OperationNotDefinedError, ), - pytest.mark.notimpl(["dask"], raises=NotImplementedError), + pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError), ], ), param( @@ -112,7 +112,7 @@ def calc_zscore(s): marks=[ pytest.mark.notimpl(["pyspark"], raises=com.UnsupportedOperationError), pytest.mark.notyet(["clickhouse"], raises=com.OperationNotDefinedError), - pytest.mark.notimpl(["dask"], raises=NotImplementedError), + pytest.mark.notimpl(["dask"], raises=com.OperationNotDefinedError), ], ), param( @@ -156,11 +156,7 @@ def calc_zscore(s): lambda _, win: ibis.row_number().over(win), lambda t: t.cumcount(), id="row_number", - marks=[ - pytest.mark.notimpl( - ["dask", "pandas"], raises=com.OperationNotDefinedError - ) - ], + marks=[pytest.mark.notimpl(["dask", "pandas"], raises=NotImplementedError)], ), param( lambda t, win: t.double_col.cumsum().over(win), @@ -891,9 +887,9 @@ def gb_fn(df): @pytest.mark.notimpl( - ["clickhouse", "dask", "datafusion", "polars"], - raises=com.OperationNotDefinedError, + ["clickhouse", "datafusion", "polars"], raises=com.OperationNotDefinedError ) +@pytest.mark.notimpl(["dask"], raises=AttributeError) @pytest.mark.notimpl(["pyspark"], raises=AnalysisException) @pytest.mark.notyet( ["clickhouse"],