Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Categorify bug for combo encoding with null values #1652

Merged
merged 8 commits into from
Oct 3, 2022
184 changes: 120 additions & 64 deletions nvtabular/ops/categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,72 +1046,15 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt
# Use ignore_index=True to avoid allocating memory for
# an index we don't even need
df = df.sort_values(col_selector.names, na_position="first", ignore_index=True)
new_cols = {}
nulls_missing = False
for col in col_selector.names:
name_size = col + "_size"
null_size = 0
# Set null size if first element in `col` is
# null, and the `size` aggregation is known
if name_size in df and df[col].iloc[:1].isnull().any():
null_size = df[name_size].iloc[0]
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
max_emb_size = max_emb_size[col]
if options.num_buckets:
if isinstance(options.num_buckets, int):
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - options.num_buckets[col] - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df) and name_size in df:
# remove NAs from column, we have na count from above.
df = df.dropna()
# sort based on count (name_size column)
df = df.nlargest(n=nlargest, columns=name_size)
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
# recreate newly "count" ordered df
df = type(df)(new_cols)
if not dispatch.series_has_nulls(df[col]):
if name_size in df:
df = df.sort_values(name_size, ascending=False, ignore_index=True)

nulls_missing = True
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
if name_size in df:
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
name_size_multi = "_".join(col_selector.names + ["size"])
if len(col_selector.names) > 1 and name_size_multi in df:
# Using "combo" encoding
df = _combo_encode(df, name_size_multi, col_selector, options)
else:
# Using (default) "joint" encoding
df = _joint_encode(df, col_selector, options)

else:
# ensure None aka "unknown" stays at index 0
if name_size in df:
df_0 = df.iloc[0:1]
df_1 = df.iloc[1:].sort_values(name_size, ascending=False, ignore_index=True)
df = _concat([df_0, df_1])
new_cols[col] = df[col].copy(deep=False)

if name_size in df:
new_cols[name_size] = df[name_size].copy(deep=False)
if nulls_missing:
df = type(df)(new_cols)
df.to_parquet(path, index=False, compression=None)
else:
df_null = type(df)({c: [None] for c in col_selector.names})
Expand All @@ -1123,6 +1066,119 @@ def _write_uniques(dfs, base_path, col_selector: ColumnSelector, options: FitOpt
return path


@annotate("_combo_encode", color="green", domain="nvt_python")
def _combo_encode(df, name_size_multi: str, col_selector: ColumnSelector, options: FitOptions):
# Combo-encoding utility (used by _write_uniques)

# Account for max_size and num_buckets
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
raise NotImplementedError(
"Cannot specify max_size as a dictionary for 'combo' encoding."
)
if options.num_buckets:
if isinstance(options.num_buckets, dict):
raise NotImplementedError(
"Cannot specify num_buckets as a dictionary for 'combo' encoding."
)
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df):
# sort based on count (name_size_multi column)
df = df.nlargest(n=nlargest, columns=name_size_multi)

# Deal with nulls
has_nans = df[col_selector.names].iloc[0].transpose().isnull().all()
if hasattr(has_nans, "iloc"):
has_nans = has_nans[0]
if not has_nans:
null_data = {col: nullable_series([None], df, df[col].dtype) for col in col_selector.names}
null_data[name_size_multi] = [0]
null_df = type(df)(null_data)
df = _concat([null_df, df], ignore_index=True)

return df


@annotate("_joint_encode", color="green", domain="nvt_python")
def _joint_encode(df, col_selector: ColumnSelector, options: FitOptions):
# Joint-encoding utility (used by _write_uniques)

new_cols = {}
nulls_missing = False
for col in col_selector.names:
name_size = col + "_size"
null_size = 0
# Set null size if first element in `col` is
# null, and the `size` aggregation is known
if name_size in df and df[col].iloc[:1].isnull().any():
null_size = df[name_size].iloc[0]
if options.max_size:
max_emb_size = options.max_size
if isinstance(options.max_size, dict):
max_emb_size = max_emb_size[col]
if options.num_buckets:
if isinstance(options.num_buckets, int):
nlargest = max_emb_size - options.num_buckets - 1
else:
nlargest = max_emb_size - options.num_buckets[col] - 1
else:
nlargest = max_emb_size - 1

if nlargest <= 0:
raise ValueError("`nlargest` cannot be 0 or negative")

if nlargest < len(df) and name_size in df:
# remove NAs from column, we have na count from above.
df = df.dropna() # TODO: This seems dangerous - Check this
# sort based on count (name_size column)
df = df.nlargest(n=nlargest, columns=name_size)
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)
# recreate newly "count" ordered df
df = type(df)(new_cols)
if not dispatch.series_has_nulls(df[col]):
if name_size in df:
df = df.sort_values(name_size, ascending=False, ignore_index=True)

nulls_missing = True
new_cols[col] = _concat(
[nullable_series([None], df, df[col].dtype), df[col]],
ignore_index=True,
)
if name_size in df:
new_cols[name_size] = _concat(
[nullable_series([null_size], df, df[name_size].dtype), df[name_size]],
ignore_index=True,
)

else:
# ensure None aka "unknown" stays at index 0
if name_size in df:
df_0 = df.iloc[0:1]
df_1 = df.iloc[1:].sort_values(name_size, ascending=False, ignore_index=True)
df = _concat([df_0, df_1])
new_cols[col] = df[col].copy(deep=False)

if name_size in df:
new_cols[name_size] = df[name_size].copy(deep=False)
if nulls_missing:
return type(df)(new_cols)
return df


def _finish_labels(paths, cols):
return {col: paths[i] for i, col in enumerate(cols)}

Expand Down
11 changes: 11 additions & 0 deletions tests/unit/ops/test_categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ def test_categorify_multi(tmpdir, cat_names, kind, cpu):
"expected_e": [3, 2, 1, 4],
"expected_ae": [3, 4, 2, 1],
},
# Include null value
{
"df_data": {
"Author": [np.nan, "User_E", "User_B", "User_A"],
"Engaging User": ["User_C", "User_B", "User_A", "User_D"],
"Post": [1, 2, 3, 4],
},
"expected_a": [0, 3, 2, 1],
"expected_e": [3, 2, 1, 4],
"expected_ae": [1, 4, 3, 2],
},
],
)
def test_categorify_multi_combo(tmpdir, input_with_output, cat_names, cpu):
Expand Down