Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up the way shapes are computed and specified #1760

Merged
merged 12 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 17 additions & 16 deletions nvtabular/loader/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#

from merlin.dtypes.shape import Shape
from merlin.schema import ColumnSchema, Tags


Expand All @@ -22,9 +23,10 @@ def _augment_schema(
cats=None,
conts=None,
labels=None,
sparse_names=None,
sparse_max=None,
sparse_as_dense=False,
padded_cols=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renaming these arguments could be out-of-scope for this PR? Since it may be a breaking change for something that uses this function. It may be clearer to separate this into a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We couldn't really figure out what this function was supposed to do without the renames, so we went for it. The function name starts with an _ so we've appropriately signaled that external code shouldn't depend on its stability. The two places in the NVTabular that use it call it via argument order instead of specifying the names, so I understand the caution but I think we're okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it will be a safe change, and the names make things clearer :). We may consider removing this functon along with the other loader code to follow-up on the promise here in one of the upcoming releases

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And by that point we'll hopefully have updated the dataloader API to a point where this augment_schema function is no longer required either here or the copies in Transformers4Rec and in Merlin Models. The existence of this function suggests to me that there's something missing from the dataloader API as it currently exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the something that's missing is "transforms implemented as operators that provide schema tracking" 😺

padded_lengths=None,
pad=False,
batch_size=0,
):
labels = [labels] if isinstance(labels, str) else labels
for label in labels or []:
Expand All @@ -34,21 +36,20 @@ def _augment_schema(
for label in conts or []:
schema[label] = schema[label].with_tags(Tags.CONTINUOUS)

# Set the appropriate properties for the sparse_names/sparse_max/sparse_as_dense
for col in sparse_names or []:
for col in padded_cols or []:
cs = schema[col]
properties = cs.properties
if sparse_max and col in sparse_max:
properties["value_count"] = {"max": sparse_max[col]}
if sparse_as_dense:
properties["value_count"]["min"] = properties["value_count"]["max"]
dims = Shape(((1, batch_size), None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the batch_size required for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required for anything specific, just following the principle that the schema should always accurately reflect the data to the greatest extent possible. Here we have shape information since we know the batch size, so we fill it in in case that helps something downstream. I don't know if it actually will, but it seemed like the right thing to do.


if not cs.shape.dims[1].is_unknown:
dims = dims.with_dim(1, cs.shape.dims[1])

if pad:
dims = dims.with_dim_min(1, padded_lengths[col])
if padded_lengths and col in padded_lengths:
dims = dims.with_dim_max(1, padded_lengths[col])

schema[col] = ColumnSchema(
name=cs.name,
tags=cs.tags,
dtype=cs.dtype,
is_list=True,
is_ragged=not sparse_as_dense,
properties=properties,
name=cs.name, tags=cs.tags, dtype=cs.dtype, properties=cs.properties, dims=dims
)

return schema
1 change: 1 addition & 0 deletions nvtabular/loader/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def __init__(
sparse_names,
sparse_max,
sparse_as_dense,
batch_size,
)

super().__init__(
Expand Down
34 changes: 20 additions & 14 deletions nvtabular/ops/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dask.dataframe.utils import meta_nonempty

from merlin.core.dispatch import DataFrameType, annotate
from merlin.dtypes.shape import DefaultShapes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be a new feature of dtypes in core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shapes are implemented as a subfield of dtypes in Core, but this isn't really intended to be a feature of dtypes in particular. We might want to hide that implementation detail a bit more thoroughly by adjusting the imports.

As far as the defaults go, we just thought it was easier to read and understand than needing to remember which shapes mean what.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DefaultShapes.LIST object seems like it could be useful, but the tests are failing to find this in core ImportError: cannot import name 'DefaultShapes' from 'merlin.dtypes.shape

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected since there is (or was) an outstanding Core PR that adds it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it's still open: NVIDIA-Merlin/core#215

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged now

from merlin.schema import Schema
from nvtabular.ops.operator import ColumnSelector, Operator

Expand Down Expand Up @@ -186,10 +187,7 @@ def dependencies(self):
def _compute_dtype(self, col_schema, input_schema):
col_schema = super()._compute_dtype(col_schema, input_schema)

dtype = col_schema.dtype
is_list = col_schema.is_list

dtypes = {
agg_dtypes = {
"count": numpy.int32,
"nunique": numpy.int32,
"mean": numpy.float32,
Expand All @@ -199,18 +197,26 @@ def _compute_dtype(self, col_schema, input_schema):
"sum": numpy.float32,
}

is_lists = {"list": True}
agg = self._find_agg(col_schema, input_schema)
dtype = agg_dtypes.get(agg, col_schema.dtype)

return col_schema.with_dtype(dtype)

def _compute_shape(self, col_schema, input_schema):
agg_is_lists = {"list": True}

agg = self._find_agg(col_schema, input_schema)
is_list = agg_is_lists.get(agg, col_schema.is_list)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where we have fallback to the second argument of the .get here and check the col_schema.is_list attribute. If we have a fixed list, would we like to preserve that info in the shape later on instead of turning in into a ragged list which is presumably default shape corresponding to DefaultShapes.LIST

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not actually sure. Are there GroupBy aggregations that don't change the shape of list columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking at the list of aggregations, I think everything changes the shape, either from list to scalar or from scalar to list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the default there should actually be False 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice it seems that it may not matter whether it's col_schema.is_list or False. I tried running Groupby with a an agg that is not "list" on a dataframe with list feautres and we get errors in both cudf and pandas

from merlin.io import Dataset
import nvtabular as nvt
import cudf

df = cudf.DataFrame({"a": [1, 1, 2], "b": [[10], [20], [20]]})
workflow = nvt.Workflow(["a", "b"] >> nvt.ops.Groupby(groupby_cols=["a"], aggs=["sum"]))
workflow.fit_transform(Dataset(df)).compute()
# => Raises DataError: All requested aggregations are unsupported.

Some of these aggs, like sum or mean could in theory work on list features if we wanted them to.

Pandas for example, handles sum across lists as concatenation.

import pandas as pd

df = pd.DataFrame({"a": [1, 1, 2], "b": [[10], [20], [20]]})
df.groupby("a").sum()
# =>
          b
a          
1  [10, 20]
2      [20]

or if numpy arrays, then as an element-wise sum

df = pd.DataFrame({"a": [1, 1, 2], "b": [np.array([10]), np.array([20]), np.array([20])]})
df.groupby("a").sum()
# =>
      b
a      
1  [30]
2  [20]

var/std/mean/median also work and in this example return scalars. If the element type contained an array of more then 1 dimension, then mean could start returning a list type too

since cudf doesn't appear to support aggregating across list columns this is probably something we don't need to be concerned about for now.

import cudf
df = cudf.DataFrame({"a": [1, 1, 2], "b": [[10], [20], [20]]})
df.groupby("a").sum()
# => Raises DataError: All requested aggregations are unsupported.

df["b"].sum()
# => Raises TypeError: cannot perform sum with type list

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think False could be fine as the default here, and as far as this PR goes, it's no less clear than it was before.

If we need groupby aggregations for list columns as a future feature of NVTabular this will need to be revisited. I suppose even if cudf and pandas don't natively support this we could implement this ourselves by extracting the cupy/numpy arrays from the series in our own agg function to handle list column aggregations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently it seems that the only agg that is supported for list columns is list which will result in adding one additional dimension to the shape of the original list

from merlin.io import Dataset
import nvtabular as nvt
import cudf

df = cudf.DataFrame({"a": [1, 1, 2], "b": [[10], [20], [20]]})
workflow = nvt.Workflow(["a", "b"] >> nvt.ops.Groupby(groupby_cols=["a"], aggs=["list"]))
workflow.fit_transform(Dataset(df)).compute()
# =>
   a        b_list
0  1  [[10], [20]]
1  2        [[20]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great to know; I really appreciate your thoroughness in testing this out. This probably warrants a further update to the shapes here, I'll open a separate issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked here: #1763


for col_name in input_schema.column_names:
combined_aggs = _aggs_for_column(col_name, self.conv_aggs)
combined_aggs += _aggs_for_column(col_name, self.list_aggs)
for agg in combined_aggs:
if col_schema.name.endswith(f"{self.name_sep}{agg}"):
dtype = dtypes.get(agg, dtype)
is_list = is_lists.get(agg, is_list)
break
shape = DefaultShapes.LIST if is_list else DefaultShapes.SCALAR
return col_schema.with_shape(shape)

return col_schema.with_dtype(dtype, is_list=is_list, is_ragged=is_list)
def _find_agg(self, col_schema, input_schema):
input_selector = ColumnSelector(input_schema.column_names)
column_mapping = self.column_mapping(input_selector)
input_column_name = column_mapping[col_schema.name][0]
agg = col_schema.name.replace(input_column_name, "").lstrip(self.name_sep)
return agg


def _aggs_for_column(col_name, agg_dict):
Expand Down
18 changes: 13 additions & 5 deletions nvtabular/ops/join_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import nvtabular as nvt
from merlin.core.dispatch import DataFrameType, arange, concat_columns, read_parquet_dispatch
from merlin.dtypes.shape import DefaultShapes
from merlin.schema import Schema
from nvtabular.ops import categorify as nvt_cat
from nvtabular.ops.operator import ColumnSelector, Operator
Expand Down Expand Up @@ -243,17 +244,24 @@ def column_mapping(self, col_selector):

def _compute_dtype(self, col_schema, input_schema):
new_schema = super()._compute_dtype(col_schema, input_schema)

dtype = new_schema.dtype
is_list = new_schema.is_list

for agg in list(AGG_DTYPES.keys()):
if col_schema.name.endswith(f"{self.name_sep}{agg}"):
if new_schema.name.endswith(f"{self.name_sep}{agg}"):
dtype = AGG_DTYPES.get(agg, dtype)
is_list = False
break

return col_schema.with_dtype(dtype, is_list=is_list, is_ragged=is_list)
return new_schema.with_dtype(dtype)

def _compute_shape(self, col_schema, input_schema):
new_schema = super()._compute_shape(col_schema, input_schema)
shape = new_schema.shape

agg_applied = any(
new_schema.name.endswith(f"{self.name_sep}{agg}") for agg in list(AGG_DTYPES.keys())
)

return new_schema.with_shape(DefaultShapes.SCALAR if agg_applied else shape)

def set_storage_path(self, new_path, copy=False):
self.categories = nvt_cat._copy_storage(self.categories, self.out_path, new_path, copy)
Expand Down
13 changes: 12 additions & 1 deletion nvtabular/ops/list_slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram

def _compute_dtype(self, col_schema, input_schema):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method required to be overriden with this change? or could it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be removed, but that breaks some of the tests even though the functionality works without it. (There are actually two places like this.)

col_schema = super()._compute_dtype(col_schema, input_schema)
return col_schema.with_dtype(col_schema.dtype, is_list=True, is_ragged=not self.pad)
return col_schema.with_dtype(col_schema.dtype)

def _compute_properties(self, col_schema, input_schema):
col_schema = super()._compute_properties(col_schema, input_schema)
Expand All @@ -140,6 +140,17 @@ def _compute_properties(self, col_schema, input_schema):
properties["value_count"]["min"] = self.max_elements
return col_schema.with_properties(properties)

def _compute_shape(self, col_schema, input_schema):
col_schema = super()._compute_shape(col_schema, input_schema)

min_count, max_count = (0, None)
if self.max_elements != np.iinfo(np.int64).max:
max_count = self.max_elements
if self.pad:
min_count = self.max_elements

return col_schema.with_shape((None, (min_count, max_count)))

@property
def output_tags(self):
return [Tags.LIST]
Expand Down
14 changes: 13 additions & 1 deletion nvtabular/ops/value_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,20 @@ def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFram

def _compute_properties(self, col_schema, input_schema):
new_schema = super()._compute_properties(col_schema, input_schema)
stat_properties = self.stats.get(col_schema.name, {})
stat_properties = self.stats.get(col_schema.name, {"value_count": {"min": 0, "max": None}})
return col_schema.with_properties({**new_schema.properties, **stat_properties})

def _compute_shape(self, col_schema, input_schema):
new_schema = super()._compute_shape(col_schema, input_schema)

value_counts = self.stats.get(col_schema.name, {}).get("value_count", {})

min_count, max_count = (0, None)
if value_counts:
min_count = value_counts.get("min", 0)
max_count = value_counts.get("max", None)

return new_schema.with_shape((None, (min_count, max_count)))

def clear(self):
self.stats = {}
25 changes: 14 additions & 11 deletions tests/unit/ops/test_ops_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,38 @@ def test_schema_out(tags, properties, selection, op):
output_schema = op.compute_output_schema(input_schema, selector)

# should have dtype float
for col_name in selector.names:
names_group = [name for name in output_schema.column_schemas if col_name in name]
if names_group:
for name in names_group:
result_schema = output_schema.column_schemas[name]
for input_col_name in selector.names:
output_col_names = [name for name in output_schema.column_schemas if input_col_name in name]
if output_col_names:
for output_col_name in output_col_names:
result_schema = output_schema.column_schemas[output_col_name]

expected_dtype = op._compute_dtype(
ColumnSchema(col_name), Schema([input_schema.column_schemas[col_name]])
ColumnSchema(output_col_name),
Schema([input_schema.column_schemas[input_col_name]]),
).dtype

expected_tags = op._compute_tags(
ColumnSchema(col_name), Schema([input_schema.column_schemas[col_name]])
ColumnSchema(output_col_name),
Schema([input_schema.column_schemas[input_col_name]]),
).tags

expected_properties = op._compute_properties(
ColumnSchema(col_name), Schema([input_schema.column_schemas[col_name]])
ColumnSchema(output_col_name),
Schema([input_schema.column_schemas[input_col_name]]),
).properties

assert result_schema.dtype == expected_dtype
if name in selector.names:
if output_col_name in selector.names:
assert result_schema.properties == expected_properties

assert len(result_schema.tags) == len(expected_tags)
else:
assert set(expected_tags).issubset(result_schema.tags)

not_used = [col for col in all_cols if col not in selector.names]
for col_name in not_used:
assert col_name not in output_schema.column_schemas
for input_col_name in not_used:
assert input_col_name not in output_schema.column_schemas


@pytest.mark.parametrize("properties", [{"p1": "1"}])
Expand Down