Skip to content

Commit

Permalink
Fix Categorify bug for combo encoding with null values (#1652)
Browse files Browse the repository at this point in the history
* handle combo categorify with nulls

* formatting

* include test coverage

* split encoding logic into distinct functions to improve readability

Co-authored-by: rnyak <[email protected]>
Co-authored-by: Karl Higley <[email protected]>
  • Loading branch information
3 people authored Oct 3, 2022
1 parent b3e683f commit b2144b4
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 64 deletions.
184 changes: 120 additions & 64 deletions nvtabular/ops/categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,72 +1046,15 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt
# Use ignore_index=True to avoid allocating memory for
# an index we don't even need
df = df.sort_values(col_selector.names, na_position="first", ignore_index=True)
new_cols = {}
nulls_missing = False
for col in col_selector.names:
name_size = col + "_size"
null_size = 0
# Set null size if first element in `col` is
# null, and the `size` aggregation is known
if name_size in df and df[col].iloc[:1].isnull().any():
null_size = df[name_size].iloc[0]
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
max_emb_size = max_emb_size[col]
if options.num_buckets:
if isinstance(options.num_buckets, int):
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - options.num_buckets[col] - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df) and name_size in df:
# remove NAs from column, we have na count from above.
df = df.dropna()
# sort based on count (name_size column)
df = df.nlargest(n=nlargest, columns=name_size)
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
# recreate newly "count" ordered df
df = type(df)(new_cols)
if not dispatch.series_has_nulls(df[col]):
if name_size in df:
df = df.sort_values(name_size, ascending=False, ignore_index=True)

nulls_missing = True
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
if name_size in df:
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
name_size_multi = "_".join(col_selector.names + ["size"])
if len(col_selector.names) > 1 and name_size_multi in df:
# Using "combo" encoding
df = _combo_encode(df, name_size_multi, col_selector, options)
else:
# Using (default) "joint" encoding
df = _joint_encode(df, col_selector, options)

else:
# ensure None aka "unknown" stays at index 0
if name_size in df:
df_0 = df.iloc[0:1]
df_1 = df.iloc[1:].sort_values(name_size, ascending=False, ignore_index=True)
df = _concat([df_0, df_1])
new_cols[col] = df[col].copy(deep=False)

if name_size in df:
new_cols[name_size] = df[name_size].copy(deep=False)
if nulls_missing:
df = type(df)(new_cols)
df.to_parquet(path, index=False, compression=None)
else:
df_null = type(df)({c: [None] for c in col_selector.names})
Expand All @@ -1123,6 +1066,119 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt
return path


@annotate("_combo_encode", color="green", domain="nvt_python")
def _combo_encode(df, name_size_multi: str, col_selector: ColumnSelector, options: FitOptions):
# Combo-encoding utility (used by _write_uniques)

# Account for max_size and num_buckets
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
raise NotImplementedError(
"Cannot specify max_size as a dictionary for 'combo' encoding."
)
if options.num_buckets:
if isinstance(options.num_buckets, dict):
raise NotImplementedError(
"Cannot specify num_buckets as a dictionary for 'combo' encoding."
)
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df):
# sort based on count (name_size_multi column)
df = df.nlargest(n=nlargest, columns=name_size_multi)

# Deal with nulls
has_nans = df[col_selector.names].iloc[0].transpose().isnull().all()
if hasattr(has_nans, "iloc"):
has_nans = has_nans[0]
if not has_nans:
null_data = {col: nullable_series([None], df, df[col].dtype) for col in col_selector.names}
null_data[name_size_multi] = [0]
null_df = type(df)(null_data)
df = _concat([null_df, df], ignore_index=True)

return df


@annotate("_joint_encode", color="green", domain="nvt_python")
def _joint_encode(df, col_selector: ColumnSelector, options: FitOptions):
# Joint-encoding utility (used by _write_uniques)

new_cols = {}
nulls_missing = False
for col in col_selector.names:
name_size = col + "_size"
null_size = 0
# Set null size if first element in `col` is
# null, and the `size` aggregation is known
if name_size in df and df[col].iloc[:1].isnull().any():
null_size = df[name_size].iloc[0]
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
max_emb_size = max_emb_size[col]
if options.num_buckets:
if isinstance(options.num_buckets, int):
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - options.num_buckets[col] - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df) and name_size in df:
# remove NAs from column, we have na count from above.
df = df.dropna() # TODO: This seems dangerous - Check this
# sort based on count (name_size column)
df = df.nlargest(n=nlargest, columns=name_size)
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
# recreate newly "count" ordered df
df = type(df)(new_cols)
if not dispatch.series_has_nulls(df[col]):
if name_size in df:
df = df.sort_values(name_size, ascending=False, ignore_index=True)

nulls_missing = True
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
if name_size in df:
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)

else:
# ensure None aka "unknown" stays at index 0
if name_size in df:
df_0 = df.iloc[0:1]
df_1 = df.iloc[1:].sort_values(name_size, ascending=False, ignore_index=True)
df = _concat([df_0, df_1])
new_cols[col] = df[col].copy(deep=False)

if name_size in df:
new_cols[name_size] = df[name_size].copy(deep=False)
if nulls_missing:
return type(df)(new_cols)
return df


def _finish_labels(paths, cols):
return {col: paths[i] for i, col in enumerate(cols)}

Expand Down
11 changes: 11 additions & 0 deletions tests/unit/ops/test_categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu):
"expected_e": [3, 2, 1, 4],
"expected_ae": [3, 4, 2, 1],
},
# Include null value
{
"df_data": {
"Author": [np.nan, "User_E", "User_B", "User_A"],
"Engaging User": ["User_C", "User_B", "User_A", "User_D"],
"Post": [1, 2, 3, 4],
},
"expected_a": [0, 3, 2, 1],
"expected_e": [3, 2, 1, 4],
"expected_ae": [1, 4, 3, 2],
},
],
)
def test_categorify_multi_combo(tmpdir, input_with_output, cat_names, cpu):
Expand Down

0 comments on commit b2144b4

Please sign in to comment.