Skip to content

Commit

Permalink
Add ascending param in the Groupby op (#1525)
Browse files Browse the repository at this point in the history
* add ascending param

* add unit test
  • Loading branch information
rnyak authored Apr 26, 2022
1 parent 2b4f66c commit a94cc8c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
31 changes: 24 additions & 7 deletions nvtabular/ops/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ class Groupby(Operator):
String separator to use for new column names.
"""

def __init__(self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_"):
def __init__(
self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_", ascending=True
):
self.groupby_cols = groupby_cols
self.sort_cols = sort_cols or []
if isinstance(self.groupby_cols, str):
self.groupby_cols = [self.groupby_cols]
if isinstance(self.sort_cols, str):
self.sort_cols = [self.sort_cols]
self.ascending = ascending

# Split aggregations into "conventional" aggregations
# and "list-based" aggregations. After this block,
Expand Down Expand Up @@ -102,7 +105,7 @@ def __init__(self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_")
def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
# Sort if necessary
if self.sort_cols:
df = df.sort_values(self.sort_cols, ignore_index=True)
df = df.sort_values(self.sort_cols, ascending=self.ascending, ignore_index=True)

# List aggregations do not work with empty data.
# Use synthetic metadata to predict output columns.
Expand All @@ -116,7 +119,14 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram
)

# Apply aggregations
new_df = _apply_aggs(_df, self.groupby_cols, _list_aggs, _conv_aggs, name_sep=self.name_sep)
new_df = _apply_aggs(
_df,
self.groupby_cols,
_list_aggs,
_conv_aggs,
name_sep=self.name_sep,
ascending=self.ascending,
)

if empty_df:
return new_df.iloc[:0]
Expand Down Expand Up @@ -196,7 +206,7 @@ def _columns_out_from_aggs(aggs, name_sep="_"):
return _agg_cols


def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_"):
def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_", ascending=True):

# Apply conventional aggs
_columns = list(set(groupby_cols) | set(_conv_aggs) | set(_list_aggs))
Expand All @@ -210,7 +220,9 @@ def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_"):
for col, aggs in _list_aggs.items():
for _agg in aggs:
if is_list_agg(_agg, custom=True):
df[f"{col}{name_sep}{_agg}"] = _first_or_last(df[f"{col}{name_sep}list"], _agg)
df[f"{col}{name_sep}{_agg}"] = _first_or_last(
df[f"{col}{name_sep}list"], _agg, ascending=ascending
)
if "list" not in aggs:
df.drop(columns=[col + f"{name_sep}list"], inplace=True)

Expand Down Expand Up @@ -249,9 +261,14 @@ def is_list_agg(agg, custom=False):
return agg in ("list", list, "first", "last")


def _first_or_last(x, kind):
def _first_or_last(x, kind, ascending=True):
# Redirect to _first or _last
return _first(x) if kind == "first" else _last(x)
if kind == "first" and ascending:
return _first(x)
elif kind == "last" and not ascending:
return _first(x)
else:
return _last(x)


def _first(x):
Expand Down
18 changes: 14 additions & 4 deletions tests/unit/ops/test_groupyby.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@


@pytest.mark.parametrize("cpu", _CPU)
@pytest.mark.parametrize("ascending", [True, False])
@pytest.mark.parametrize("keys", [["name"], "id", ["name", "id"]])
def test_groupby_op(keys, cpu):
def test_groupby_op(keys, cpu, ascending):
# Initial timeseries dataset
size = 60
df1 = make_df(
Expand All @@ -57,11 +58,12 @@ def test_groupby_op(keys, cpu):
groupby_cols=keys,
sort_cols=["ts"],
aggs={
"x": ["list", "sum"],
"x": ["list", "sum", "first", "last"],
"y": ["first", "last"],
"ts": ["min"],
},
name_sep="-",
ascending=ascending,
)
processor = nvt.Workflow(groupby_features)
processor.fit(dataset)
Expand All @@ -85,8 +87,10 @@ def test_groupby_op(keys, cpu):
for el in x.values:
_el = pd.Series(el)
sums.append(_el.sum())
assert _el.is_monotonic_increasing

if ascending:
assert _el.is_monotonic_increasing
else:
assert _el.is_monotonic_decreasing
# Check that list sums match sum aggregation
x = new_gdf["x-sum"]
x = x.to_pandas() if hasattr(x, "to_pandas") else x
Expand All @@ -95,6 +99,12 @@ def test_groupby_op(keys, cpu):
# Check basic behavior or "y" column
assert (new_gdf["y-first"] < new_gdf["y-last"]).all()

for i in range(len(new_gdf)):
if ascending:
assert new_gdf["x-first"].iloc[i] == new_gdf["x-list"].iloc[i][0]
else:
assert new_gdf["x-first"].iloc[i] == new_gdf["x-list"].iloc[i][-1]


@pytest.mark.parametrize("cpu", _CPU)
def test_groupby_string_agg(cpu):
Expand Down

0 comments on commit a94cc8c

Please sign in to comment.