Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ascending param in the Groupby op #1525

Merged
merged 2 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions nvtabular/ops/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ class Groupby(Operator):
String separator to use for new column names.
"""

def __init__(self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_"):
def __init__(
self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_", ascending=True
):
self.groupby_cols = groupby_cols
self.sort_cols = sort_cols or []
if isinstance(self.groupby_cols, str):
self.groupby_cols = [self.groupby_cols]
if isinstance(self.sort_cols, str):
self.sort_cols = [self.sort_cols]
self.ascending = ascending

# Split aggregations into "conventional" aggregations
# and "list-based" aggregations. After this block,
Expand Down Expand Up @@ -102,7 +105,7 @@ def __init__(self, groupby_cols=None, sort_cols=None, aggs="list", name_sep="_")
def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType:
# Sort if necessary
if self.sort_cols:
df = df.sort_values(self.sort_cols, ignore_index=True)
df = df.sort_values(self.sort_cols, ascending=self.ascending, ignore_index=True)

# List aggregations do not work with empty data.
# Use synthetic metadata to predict output columns.
Expand All @@ -116,7 +119,14 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram
)

# Apply aggregations
new_df = _apply_aggs(_df, self.groupby_cols, _list_aggs, _conv_aggs, name_sep=self.name_sep)
new_df = _apply_aggs(
_df,
self.groupby_cols,
_list_aggs,
_conv_aggs,
name_sep=self.name_sep,
ascending=self.ascending,
)

if empty_df:
return new_df.iloc[:0]
Expand Down Expand Up @@ -196,7 +206,7 @@ def _columns_out_from_aggs(aggs, name_sep="_"):
return _agg_cols


def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_"):
def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_", ascending=True):

# Apply conventional aggs
_columns = list(set(groupby_cols) | set(_conv_aggs) | set(_list_aggs))
Expand All @@ -210,7 +220,9 @@ def _apply_aggs(_df, groupby_cols, _list_aggs, _conv_aggs, name_sep="_"):
for col, aggs in _list_aggs.items():
for _agg in aggs:
if is_list_agg(_agg, custom=True):
df[f"{col}{name_sep}{_agg}"] = _first_or_last(df[f"{col}{name_sep}list"], _agg)
df[f"{col}{name_sep}{_agg}"] = _first_or_last(
df[f"{col}{name_sep}list"], _agg, ascending=ascending
)
if "list" not in aggs:
df.drop(columns=[col + f"{name_sep}list"], inplace=True)

Expand Down Expand Up @@ -249,9 +261,14 @@ def is_list_agg(agg, custom=False):
return agg in ("list", list, "first", "last")


def _first_or_last(x, kind):
def _first_or_last(x, kind, ascending=True):
# Redirect to _first or _last
return _first(x) if kind == "first" else _last(x)
if kind == "first" and ascending:
return _first(x)
elif kind == "last" and not ascending:
return _first(x)
else:
return _last(x)


def _first(x):
Expand Down
18 changes: 14 additions & 4 deletions tests/unit/ops/test_groupyby.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@


@pytest.mark.parametrize("cpu", _CPU)
@pytest.mark.parametrize("ascending", [True, False])
@pytest.mark.parametrize("keys", [["name"], "id", ["name", "id"]])
def test_groupby_op(keys, cpu):
def test_groupby_op(keys, cpu, ascending):
# Initial timeseries dataset
size = 60
df1 = make_df(
Expand All @@ -57,11 +58,12 @@ def test_groupby_op(keys, cpu):
groupby_cols=keys,
sort_cols=["ts"],
aggs={
"x": ["list", "sum"],
"x": ["list", "sum", "first", "last"],
"y": ["first", "last"],
"ts": ["min"],
},
name_sep="-",
ascending=ascending,
)
processor = nvt.Workflow(groupby_features)
processor.fit(dataset)
Expand All @@ -85,8 +87,10 @@ def test_groupby_op(keys, cpu):
for el in x.values:
_el = pd.Series(el)
sums.append(_el.sum())
assert _el.is_monotonic_increasing

if ascending:
assert _el.is_monotonic_increasing
else:
assert _el.is_monotonic_decreasing
# Check that list sums match sum aggregation
x = new_gdf["x-sum"]
x = x.to_pandas() if hasattr(x, "to_pandas") else x
Expand All @@ -95,6 +99,12 @@ def test_groupby_op(keys, cpu):
# Check basic behavior or "y" column
assert (new_gdf["y-first"] < new_gdf["y-last"]).all()

for i in range(len(new_gdf)):
if ascending:
assert new_gdf["x-first"].iloc[i] == new_gdf["x-list"].iloc[i][0]
else:
assert new_gdf["x-first"].iloc[i] == new_gdf["x-list"].iloc[i][-1]


@pytest.mark.parametrize("cpu", _CPU)
def test_groupby_string_agg(cpu):
Expand Down