Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Operator for Implicit Models #134

Merged
merged 20 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
20d16ec
Add first version of PredictImplicit operator
oliverholworthy Jul 11, 2022
c81dc28
Add module and class names to config to be able to re-load model
oliverholworthy Jul 11, 2022
f7c8b52
Set filter_already_liked_items=False so that user_items is not required
oliverholworthy Jul 11, 2022
9c7fa2b
Add ensemble test for PredictImplicit
oliverholworthy Jul 11, 2022
b59cd94
Add workflow for implicit op
oliverholworthy Jul 11, 2022
9f064a3
Remove depencency on merlin models for tests
oliverholworthy Jul 11, 2022
60e1b24
Add "als" to ci/ignore_codespell_words.txt
oliverholworthy Jul 11, 2022
37a63fe
Update docstring for transform method and remove whitespace
oliverholworthy Jul 11, 2022
5ea4551
Add 'n' (number of items to recommend) to the inputs of the op
oliverholworthy Jul 11, 2022
0c05223
Update implicit tests to check muliple user ids and use grpcclient
oliverholworthy Aug 2, 2022
4bd4c30
Add check for implicit version
oliverholworthy Aug 2, 2022
d591a11
Uncomment ensemble tests for als/lmf
oliverholworthy Aug 2, 2022
809b3a7
Move num_to_recommend from request argument to predict op constructor
oliverholworthy Aug 2, 2022
d108a0e
Update from_config signature to be consistent with others
oliverholworthy Aug 2, 2022
996b505
Pass in num_to_recommend with keyword argument to recommend in tests
oliverholworthy Aug 2, 2022
94d46c5
Rename test for config for clarity
oliverholworthy Aug 2, 2022
413f554
Specify low for random num_to_recommend to avoid zero
oliverholworthy Aug 2, 2022
8ba0eb4
Merge branch 'main' into op-implicit
karlhigley Aug 15, 2022
50855cc
Remove workflow for implicit and add package to requirements-test
oliverholworthy Aug 15, 2022
a4990ac
Correct function handling TritonPythonModel model_repository
oliverholworthy Aug 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/ignore_codespell_words.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ fo
ot
lik
usera
als
137 changes: 137 additions & 0 deletions merlin/systems/dag/ops/implicit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib
import json
import pathlib

from merlin.dag import ColumnSelector # noqa
from merlin.schema import ColumnSchema, Schema
from merlin.systems.dag.ops.operator import InferenceDataFrame, PipelineableInferenceOperator

try:
import implicit
from packaging.version import Version

if Version(implicit.__version__) < Version("0.6.0"):
raise RuntimeError(
"Implicit version 0.6.0 or higher required. (for model save/load methods)."
)
except ImportError:
implicit = None


class PredictImplicit(PipelineableInferenceOperator):
"""Operator for running inference on Implicit models.."""

def __init__(self, model, num_to_recommend: int = 10, **kwargs):
"""Instantiate an Implicit prediction operator.

Parameters
----------
model : An Implicit Model instance
num_to_recommend : int
the number of items to return
"""
self.model = model
self.num_to_recommend = num_to_recommend
super().__init__(**kwargs)

def compute_output_schema(
self,
input_schema: Schema,
col_selector: ColumnSelector,
prev_output_schema: Schema = None,
) -> Schema:
"""Return the output schema representing the columns this operator returns."""
return Schema([ColumnSchema("ids", dtype="int64"), ColumnSchema("scores", dtype="float64")])

def compute_input_schema(
self,
root_schema: Schema,
parents_schema: Schema,
deps_schema: Schema,
selector: ColumnSelector,
) -> Schema:
"""Return the input schema representing the input columns this operator expects to use."""
return Schema([ColumnSchema("user_id", dtype="int64")])

def export(self, path, input_schema, output_schema, params=None, node_id=None, version=1):
"""Export the class and related files to the path specified."""
node_name = f"{node_id}_{self.export_name}" if node_id is not None else self.export_name
version_path = pathlib.Path(path) / node_name / str(version)
version_path.mkdir(parents=True, exist_ok=True)
model_path = version_path / "model.npz"
self.model.save(str(model_path))
params = params or {}
params["model_module_name"] = self.model.__module__
params["model_class_name"] = self.model.__class__.__name__
params["num_to_recommend"] = self.num_to_recommend
return super().export(
path,
input_schema,
output_schema,
params=params,
node_id=node_id,
version=version,
)

@classmethod
def from_config(cls, config: dict, **kwargs) -> "PredictImplicit":
"""Instantiate the class from a dictionary representation.

Expected config structure:
{
"input_dict": str # JSON dict with input names and schemas
"params": str # JSON dict with params saved at export
}

"""
params = json.loads(config["params"])

model_repository = kwargs["model_repository"]
model_name = kwargs["model_name"]
model_version = kwargs["model_version"]

# load implicit model
model_module_name = params["model_module_name"]
model_class_name = params["model_class_name"]
model_module = importlib.import_module(model_module_name)
model_cls = getattr(model_module, model_class_name)
model_file = pathlib.Path(model_repository) / model_name / str(model_version) / "model.npz"
model = model_cls.load(str(model_file))

num_to_recommend = params["num_to_recommend"]

return cls(model, num_to_recommend=num_to_recommend)

def transform(self, df: InferenceDataFrame) -> InferenceDataFrame:
"""Transform the dataframe by applying this operator to the set of input columns.

Parameters
-----------
df: InferenceDataFrame
A pandas or cudf dataframe that this operator will work on

Returns
-------
InferenceDataFrame
Returns a transformed dataframe for this operator"""
user_id = df["user_id"].ravel()
user_items = None
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move this to the inputs if it's a common pattern for users to pass in. My understanding is that this mostly serves as a mechanism to filter out items already seen.

Copy link
Member

@benfred benfred Jul 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok - user_items can be left out if the filter_already_liked_items is False, and recalculate_user is also False (otherwise we need to know what the items the user has liked).

ids, scores = self.model.recommend(
user_id, user_items, N=self.num_to_recommend, filter_already_liked_items=False
)
return InferenceDataFrame({"ids": ids, "scores": scores})
15 changes: 7 additions & 8 deletions merlin/systems/triton/oprunner_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,13 @@ def execute(self, requests):

def _parse_model_repository(model_repository: str) -> str:
"""
Extract the model repository path from the value passed to the TritonPythonModel
initialize method.
Extract the model repository path from the model_repository value
passed to the TritonPythonModel initialize method.
"""
model_repository_path = pathlib.Path(model_repository).parent

# Handle bug in Tritonserver 22.06
# model_repository argument became path to model.py
if str(model_repository).endswith(".py"):
model_repository_path = model_repository_path.parent

return str(model_repository_path)
# instead of path to model directory within the model repository
if model_repository.endswith(".py"):
return str(pathlib.Path(model_repository).parent.parent.parent)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

example:

import pathlib

pathlib.Path("/tmp/my_model_repository/my_model_name/1/model.py").parent.parent.parent
# => PosixPath('/tmp/my_model_repository')

else:
return str(pathlib.Path(model_repository).parent)
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ testbook==0.4.2
# packages necessary to run tests and push PRs
feast==0.19.4
xgboost==1.6.1
implicit==0.6.0

# TODO: do we need more of these?
# https://github.com/NVIDIA-Merlin/Merlin/blob/a1cc48fe23c4dfc627423168436f26ef7e028204/ci/dockerfile.ci#L13-L18
19 changes: 19 additions & 0 deletions tests/unit/systems/implicit/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest

pytest.importorskip("implicit")
127 changes: 127 additions & 0 deletions tests/unit/systems/implicit/test_implicit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from distutils.spawn import find_executable

import implicit
import numpy as np
import pytest
from scipy.sparse import csr_matrix

from merlin.schema import ColumnSchema, Schema
from merlin.systems.dag.ensemble import Ensemble
from merlin.systems.dag.ops.implicit import PredictImplicit
from merlin.systems.triton.utils import run_triton_server

TRITON_SERVER_PATH = find_executable("tritonserver")

triton = pytest.importorskip("merlin.systems.triton")
grpcclient = pytest.importorskip("tritonclient.grpc")


@pytest.mark.parametrize(
"model_cls",
[
implicit.bpr.BayesianPersonalizedRanking,
implicit.als.AlternatingLeastSquares,
implicit.lmf.LogisticMatrixFactorization,
],
)
def test_reload_from_config(model_cls, tmpdir):
model = model_cls()
n = 10
user_items = csr_matrix(np.random.choice([0, 1], size=n * n).reshape(n, n))
model.fit(user_items)

op = PredictImplicit(model)

config = op.export(tmpdir, Schema(), Schema())

node_config = json.loads(config.parameters[config.name].string_value)

cls = PredictImplicit.from_config(
node_config,
model_repository=tmpdir,
model_name=config.name,
model_version=1,
)
reloaded_model = cls.model

num_to_recommend = np.random.randint(1, n)
user_items = None
ids, scores = model.recommend(
1, user_items, N=num_to_recommend, filter_already_liked_items=False
)

reloaded_ids, reloaded_scores = reloaded_model.recommend(
1, user_items, N=num_to_recommend, filter_already_liked_items=False
)

np.testing.assert_array_equal(ids, reloaded_ids)
np.testing.assert_array_equal(scores, reloaded_scores)


@pytest.mark.skipif(not TRITON_SERVER_PATH, reason="triton server not found")
@pytest.mark.parametrize(
"model_cls",
[
implicit.bpr.BayesianPersonalizedRanking,
implicit.als.AlternatingLeastSquares,
implicit.lmf.LogisticMatrixFactorization,
],
)
def test_ensemble(model_cls, tmpdir):
model = model_cls()
n = 100
user_items = csr_matrix(np.random.choice([0, 1], size=n * n, p=[0.9, 0.1]).reshape(n, n))
model.fit(user_items)

num_to_recommend = np.random.randint(1, n)

user_items = None
ids, scores = model.recommend(
[0, 1], user_items, N=num_to_recommend, filter_already_liked_items=False
)

implicit_op = PredictImplicit(model, num_to_recommend=num_to_recommend)

input_schema = Schema([ColumnSchema("user_id", dtype="int64")])

triton_chain = input_schema.column_names >> implicit_op

triton_ens = Ensemble(triton_chain, input_schema)
triton_ens.export(tmpdir)

model_name = triton_ens.name
input_user_id = np.array([[0], [1]], dtype=np.int64)
inputs = [
grpcclient.InferInput(
"user_id", input_user_id.shape, triton.np_to_triton_dtype(input_user_id.dtype)
),
]
inputs[0].set_data_from_numpy(input_user_id)
outputs = [grpcclient.InferRequestedOutput("scores"), grpcclient.InferRequestedOutput("ids")]

response = None

with run_triton_server(tmpdir) as client:
response = client.infer(model_name, inputs, outputs=outputs)

response_ids = response.as_numpy("ids")
response_scores = response.as_numpy("scores")

np.testing.assert_array_equal(ids, response_ids)
np.testing.assert_array_equal(scores, response_scores)