Skip to content

Commit

Permalink
Feature/vector inference (#52)
Browse files Browse the repository at this point in the history
- Optimized inference for vector models with DOT and COSINE distance
using implicit library inference methods
  • Loading branch information
blondered authored Oct 26, 2023
1 parent e897d44 commit e8731e6
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated notebooks in `examples` ([#44](https://github.com/MobileTeleSystems/RecTools/pull/44))
- Moved `lightfm` to extras ([#51](https://github.com/MobileTeleSystems/RecTools/pull/51))
- Renamed `nn` extra to `torch` ([#51](https://github.com/MobileTeleSystems/RecTools/pull/51))
- Optimized inference for vector models with COSINE and DOT distances using `implicit` library topk method ([#52](https://github.com/MobileTeleSystems/RecTools/pull/52))

### Fixed
- Fixed bugs with new version of `pytorch_lightning` ([#43](https://github.com/MobileTeleSystems/RecTools/pull/43))
Expand Down
2 changes: 2 additions & 0 deletions rectools/models/implicit_als.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ def __init__(self, model: AnyAlternatingLeastSquares, verbose: int = 0, fit_feat

self.fit_features_together = fit_features_together
self.use_gpu = isinstance(model, GPUAlternatingLeastSquares)
if not self.use_gpu:
self.n_threads = model.num_threads

def _fit(self, dataset: Dataset) -> None: # type: ignore
self.model = deepcopy(self._model)
Expand Down
211 changes: 188 additions & 23 deletions rectools/models/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from enum import Enum

import attr
import implicit.cpu
import numpy as np
from implicit.cpu.matrix_factorization_base import _filter_items_from_sparse_matrix as filter_items_from_sparse_matrix
from scipy import sparse
from tqdm.auto import tqdm

from rectools import InternalIds
Expand All @@ -43,6 +46,126 @@ class Factors:
biases: tp.Optional[np.ndarray] = None


class ImplicitRanker:
"""
Ranker for DOT and COSINE similarity distance which uses implicit library matrix factorization topk method
Parameters
----------
distance : Distance
Distance metric.
subjects_factors : np.ndarray
Array of subject embeddings, shape (n_subjects, n_factors).
objects_factors : np.ndarray
Array with embeddings of all objects, shape (n_objects, n_factors).
"""

def __init__(self, distance: Distance, subjects_factors: np.ndarray, objects_factors: np.ndarray) -> None:
if distance not in (Distance.DOT, Distance.COSINE):
raise ValueError(f"ImplicitRanker is not suitable for {distance} distance")
self.distance = distance
self.subjects_factors = subjects_factors.astype(np.float32)
self.objects_factors = objects_factors.astype(np.float32)

self.subjects_norms: np.ndarray
self.objects_norms: np.ndarray
if distance == Distance.COSINE:
self.subjects_norms = np.linalg.norm(self.subjects_factors, axis=1)
self.objects_norms = np.linalg.norm(self.objects_factors, axis=1)
self.objects_norms[self.objects_norms == 0] = 1e-10
self.subjects_norms[self.subjects_norms == 0] = 1e-10

def _get_neginf_score(self) -> float:
return -np.finfo(np.float32).max

def _get_mask_for_correct_scores(self, scores: np.ndarray) -> tp.List[bool]:
"""Filter scores from implicit library that are not relevant. Implicit library assigns `neginf` score
to items that are meant to be filtered (e.g. blacklist items or already seen items)
"""
num_masked = 0
min_score = self._get_neginf_score()
for el in np.flip(scores):
if el <= min_score:
num_masked += 1
else:
break
return [True] * (len(scores) - num_masked) + [False] * num_masked

def _process_implicit_scores(
self, subject_ids: np.ndarray, ids: np.ndarray, scores: np.ndarray
) -> tp.Tuple[InternalIds, InternalIds, Scores]:

all_target_ids = []
all_reco_ids: tp.List[np.ndarray] = []
all_scores: tp.List[np.ndarray] = []

for subject_id, object_ids, object_scores in zip(subject_ids, ids, scores):
correct_mask = self._get_mask_for_correct_scores(object_scores)
relevant_scores = object_scores[correct_mask]
relevant_ids = object_ids[correct_mask]

if self.distance == Distance.COSINE:
subject_norm = self.subjects_norms[subject_id]
relevant_scores /= subject_norm

all_target_ids.extend([subject_id for _ in range(len(relevant_ids))])
all_reco_ids.append(relevant_ids)
all_scores.append(relevant_scores)

return all_target_ids, np.concatenate(all_reco_ids), np.concatenate(all_scores)

def rank(
self,
subject_ids: np.ndarray,
k: int,
ui_csr_for_filter: tp.Optional[sparse.csr_matrix], # only relevant for u2i recos
sorted_item_ids_to_recommend: tp.Optional[np.ndarray], # whitelist
num_threads: int = 0,
) -> tp.Tuple[InternalIds, InternalIds, Scores]:
"""Proceed inference using implicit library matrix factorization topk cpu method"""
if sorted_item_ids_to_recommend is not None:
object_factors_whitelist = self.objects_factors[sorted_item_ids_to_recommend]

if ui_csr_for_filter is not None:
# filter ui_csr_for_filter matrix to contain only whitelist objects
filter_query_items = filter_items_from_sparse_matrix(sorted_item_ids_to_recommend, ui_csr_for_filter)
else:
filter_query_items = None

else:
# keep all objects and full ui_csr_for_filter
object_factors_whitelist = self.objects_factors
filter_query_items = ui_csr_for_filter

subject_factors = self.subjects_factors[subject_ids]

object_norms = None # for DOT distance
if self.distance == Distance.COSINE:
object_norms = self.objects_norms
if sorted_item_ids_to_recommend is not None:
object_norms = object_norms[sorted_item_ids_to_recommend]

real_k = min(k, object_factors_whitelist.shape[0])

ids, scores = implicit.cpu.topk.topk( # pylint: disable=c-extension-no-member
items=object_factors_whitelist,
query=subject_factors,
k=real_k,
item_norms=object_norms, # query norms for COSINE distance are applied afterwards
filter_query_items=filter_query_items, # queries x objects csr matrix for getting neginf scores
filter_items=None, # rectools doesn't support blacklist for now
num_threads=num_threads,
)

if sorted_item_ids_to_recommend is not None:
ids = sorted_item_ids_to_recommend[ids]

# filter neginf from implicit scores and apply norms for correct COSINE distance
all_target_ids, all_reco_ids, all_scores = self._process_implicit_scores(subject_ids, ids, scores)

return all_target_ids, all_reco_ids, all_scores


class ScoreCalculator:
"""
Calculate proximity scores between one subject (e.g. user) and all objects (e.g. items)
Expand Down Expand Up @@ -120,6 +243,7 @@ class VectorModel(ModelBase):

u2i_dist: Distance = NotImplemented
i2i_dist: Distance = NotImplemented
n_threads: int = 0 # TODO: decide how to pass it correctly for all models

def _recommend_u2i(
self,
Expand All @@ -131,9 +255,24 @@ def _recommend_u2i(
) -> tp.Tuple[InternalIds, InternalIds, Scores]:
if filter_viewed:
user_items = dataset.get_user_item_matrix(include_weights=False)
else:
user_items = None

user_vectors, item_vectors = self._get_u2i_vectors(dataset)

scores_calculator = self._get_u2i_calculator(dataset)
if self.u2i_dist in (Distance.COSINE, Distance.DOT):

ranker = ImplicitRanker(self.u2i_dist, user_vectors, item_vectors)
ui_csr_for_filter = user_items[user_ids] if filter_viewed else None
return ranker.rank(
subject_ids=user_ids,
k=k,
ui_csr_for_filter=ui_csr_for_filter,
sorted_item_ids_to_recommend=sorted_item_ids_to_recommend,
num_threads=self.n_threads,
)

scores_calculator = ScoreCalculator(self.u2i_dist, user_vectors, item_vectors)
all_target_ids = []
all_reco_ids: tp.List[np.ndarray] = []
all_scores: tp.List[np.ndarray] = []
Expand All @@ -159,8 +298,20 @@ def _recommend_i2i(
k: int,
sorted_item_ids_to_recommend: tp.Optional[np.ndarray],
) -> tp.Tuple[InternalIds, InternalIds, Scores]:
scores_calculator = self._get_i2i_calculator(dataset)
item_vectors_1, item_vectors_2 = self._get_i2i_vectors(dataset)

if self.i2i_dist in (Distance.COSINE, Distance.DOT):
ranker = ImplicitRanker(self.i2i_dist, item_vectors_1, item_vectors_2)

return ranker.rank(
subject_ids=target_ids,
k=k,
ui_csr_for_filter=None,
sorted_item_ids_to_recommend=sorted_item_ids_to_recommend,
num_threads=self.n_threads,
)

scores_calculator = ScoreCalculator(self.i2i_dist, item_vectors_1, item_vectors_2)
all_target_ids = []
all_reco_ids: tp.List[np.ndarray] = []
all_scores: tp.List[np.ndarray] = []
Expand All @@ -179,7 +330,30 @@ def _recommend_i2i(

return all_target_ids, np.concatenate(all_reco_ids), np.concatenate(all_scores)

def _get_u2i_calculator(self, dataset: Dataset) -> ScoreCalculator:
def _process_biases_to_vectors(
self,
distance: Distance,
subject_embeddings: np.ndarray,
subject_biases: np.ndarray,
object_embeddings: np.ndarray,
object_biases: np.ndarray,
) -> tp.Tuple[np.ndarray, np.ndarray]:
# TODO: make it possible to control if add biases or not (even if they are present)
if distance == Distance.DOT:
subject_vectors = np.hstack(
(subject_biases[:, np.newaxis], np.ones((subject_biases.size, 1)), subject_embeddings)
)
object_vectors = np.hstack(
(np.ones((object_biases.size, 1)), object_biases[:, np.newaxis], object_embeddings)
)
elif distance in (Distance.COSINE, Distance.EUCLIDEAN):
subject_vectors = np.hstack((subject_biases[:, np.newaxis], subject_embeddings))
object_vectors = np.hstack((object_biases[:, np.newaxis], object_embeddings))
else:
raise ValueError(f"Unexpected distance `{distance}`")
return subject_vectors, object_vectors

def _get_u2i_vectors(self, dataset: Dataset) -> tp.Tuple[np.ndarray, np.ndarray]:
user_factors = self._get_users_factors(dataset)
item_factors = self._get_items_factors(dataset)

Expand All @@ -188,34 +362,25 @@ def _get_u2i_calculator(self, dataset: Dataset) -> ScoreCalculator:
user_biases = user_factors.biases
item_biases = item_factors.biases

# TODO: make it possible to control if add biases or not (even if they present)
if user_biases is not None and item_biases is not None:
if self.u2i_dist == Distance.DOT:
user_vectors = np.hstack((user_biases[:, np.newaxis], np.ones((user_biases.size, 1)), user_vectors))
item_vectors = np.hstack((np.ones((item_biases.size, 1)), item_biases[:, np.newaxis], item_vectors))
elif self.u2i_dist in (Distance.COSINE, Distance.EUCLIDEAN):
user_vectors = np.hstack((user_biases[:, np.newaxis], user_vectors))
item_vectors = np.hstack((item_biases[:, np.newaxis], item_vectors))
else:
raise ValueError(f"Unexpected distance `{self.u2i_dist}`")
user_vectors, item_vectors = self._process_biases_to_vectors(
self.u2i_dist, user_vectors, user_biases, item_vectors, item_biases
)

return ScoreCalculator(self.u2i_dist, user_vectors, item_vectors)
return user_vectors, item_vectors

def _get_i2i_calculator(self, dataset: Dataset) -> ScoreCalculator:
def _get_i2i_vectors(self, dataset: Dataset) -> tp.Tuple[np.ndarray, np.ndarray]:
item_factors = self._get_items_factors(dataset)
item_vectors = item_factors.embeddings
item_biases = item_factors.biases
item_vectors_1 = item_vectors_2 = item_vectors

if item_biases is not None: # TODO: make it possible to control if add biases or not (even if they present)
if self.i2i_dist == Distance.DOT:
item_vectors_1 = np.hstack((np.ones((item_biases.size, 1)), item_biases[:, np.newaxis], item_vectors))
item_vectors_2 = np.hstack((item_biases[:, np.newaxis], np.ones((item_biases.size, 1)), item_vectors))
elif self.i2i_dist in (Distance.COSINE, Distance.EUCLIDEAN):
item_vectors_1 = item_vectors_2 = np.hstack((item_biases[:, np.newaxis], item_vectors))
else:
raise ValueError(f"Unexpected distance `{self.u2i_dist}`")
return ScoreCalculator(self.i2i_dist, item_vectors_1, item_vectors_2)
if item_biases is not None:
item_vectors_1, item_vectors_2 = self._process_biases_to_vectors(
self.i2i_dist, item_vectors, item_biases, item_vectors, item_biases
)

return item_vectors_1, item_vectors_2

def _get_users_factors(self, dataset: Dataset) -> Factors:
raise NotImplementedError()
Expand Down
59 changes: 54 additions & 5 deletions tests/models/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

import typing as tp

import implicit.cpu
import numpy as np
import pandas as pd
import pytest

from rectools import Columns
from rectools.dataset import Dataset
from rectools.models.vector import Distance, Factors, ScoreCalculator, VectorModel
from rectools.models.vector import Distance, Factors, ImplicitRanker, ScoreCalculator, VectorModel

T = tp.TypeVar("T")

Expand Down Expand Up @@ -60,6 +61,54 @@ def test_scores_calculator_with_incorrect_distance() -> None:
calculator.calc(1)


class TestImplicitRanker: # pylint: disable=protected-access
@pytest.fixture
def subject_factors(self) -> np.ndarray:
return np.array([[-4, 0, 3], [0, 0, 0]])

@pytest.fixture
def object_factors(self) -> np.ndarray:
return np.array(
[
[-4, 0, 3],
[0, 0, 0],
[1, 1, 1],
]
)

def test_neginf_score(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None:
implicit_ranker = ImplicitRanker(Distance.DOT, subjects_factors=subject_factors, objects_factors=object_factors)
dummy_factors = np.array([[1, 2]], dtype=np.float32)
neginf = implicit.cpu.topk.topk( # pylint: disable=c-extension-no-member
items=dummy_factors,
query=dummy_factors,
k=1,
filter_items=np.array([0]),
)[1][0][0]
assert neginf == implicit_ranker._get_neginf_score()

def test_with_incorrect_distance(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None:
with pytest.raises(ValueError):
ImplicitRanker(Distance.EUCLIDEAN, subjects_factors=subject_factors, objects_factors=object_factors)

def test_mask_for_correct_scores(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None:
implicit_ranker = ImplicitRanker(Distance.DOT, subjects_factors=subject_factors, objects_factors=object_factors)
neginf = implicit_ranker._get_neginf_score()
scores = np.array([7, 6, 0, 0], dtype=np.float32)

actual = implicit_ranker._get_mask_for_correct_scores(scores)
assert actual == [True] * 4

actual = implicit_ranker._get_mask_for_correct_scores(np.append(scores, [neginf] * 2))
assert actual == [True] * 4 + [False] * 2

actual = implicit_ranker._get_mask_for_correct_scores(np.append(scores, [neginf * 0.99] * 2))
assert actual == [True] * 6

actual = implicit_ranker._get_mask_for_correct_scores(np.insert(scores, 0, neginf))
assert actual == [True] * 5


class TestVectorModel: # pylint: disable=protected-access, attribute-defined-outside-init
def setup(self) -> None:
stub_interactions = pd.DataFrame([], columns=Columns.Interactions)
Expand Down Expand Up @@ -125,7 +174,7 @@ def test_without_biases(
else: # i2i
_, reco, scores = model._recommend_i2i(np.array([0, 1]), self.stub_dataset, 5, None)
assert list(reco) == sum(expected_reco, [])
assert list(scores) == sum(expected_scores, [])
np.testing.assert_almost_equal(scores, np.array(expected_scores).ravel(), decimal=5)

@pytest.mark.parametrize(
"distance,expected_reco,expected_scores",
Expand All @@ -151,14 +200,14 @@ def test_with_biases(
else: # i2i
_, reco, scores = model._recommend_i2i(np.array([0, 1]), self.stub_dataset, 5, None)
assert list(reco) == sum(expected_reco, [])
assert list(scores) == sum(expected_scores, [])
np.testing.assert_almost_equal(scores, np.array(expected_scores).ravel(), decimal=5)

@pytest.mark.parametrize("method", ("u2i", "i2i"))
def test_with_incorrect_distance(self, method: str) -> None:
with pytest.raises(ValueError):
if method == "u2i":
m = self.make_model(self.user_biased_factors, self.item_biased_factors, u2i_distance=7) # type: ignore
m._get_u2i_calculator(self.stub_dataset)
m._get_u2i_vectors(self.stub_dataset)
else:
m = self.make_model(self.user_biased_factors, self.item_biased_factors, i2i_distance=7) # type: ignore
m._get_i2i_calculator(self.stub_dataset)
m._get_i2i_vectors(self.stub_dataset)

0 comments on commit e8731e6

Please sign in to comment.