diff --git a/CHANGELOG.md b/CHANGELOG.md index e9239091..809c4ea6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated notebooks in `examples` ([#44](https://github.com/MobileTeleSystems/RecTools/pull/44)) - Moved `lightfm` to extras ([#51](https://github.com/MobileTeleSystems/RecTools/pull/51)) - Renamed `nn` extra to `torch` ([#51](https://github.com/MobileTeleSystems/RecTools/pull/51)) +- Optimized inference for vector models with COSINE and DOT distances using `implicit` library topk method ([#52](https://github.com/MobileTeleSystems/RecTools/pull/52)) ### Fixed - Fixed bugs with new version of `pytorch_lightning` ([#43](https://github.com/MobileTeleSystems/RecTools/pull/43)) diff --git a/rectools/models/implicit_als.py b/rectools/models/implicit_als.py index e41a960d..8908510d 100644 --- a/rectools/models/implicit_als.py +++ b/rectools/models/implicit_als.py @@ -62,6 +62,8 @@ def __init__(self, model: AnyAlternatingLeastSquares, verbose: int = 0, fit_feat self.fit_features_together = fit_features_together self.use_gpu = isinstance(model, GPUAlternatingLeastSquares) + if not self.use_gpu: + self.n_threads = model.num_threads def _fit(self, dataset: Dataset) -> None: # type: ignore self.model = deepcopy(self._model) diff --git a/rectools/models/vector.py b/rectools/models/vector.py index de819c7f..831f3250 100644 --- a/rectools/models/vector.py +++ b/rectools/models/vector.py @@ -18,7 +18,10 @@ from enum import Enum import attr +import implicit.cpu import numpy as np +from implicit.cpu.matrix_factorization_base import _filter_items_from_sparse_matrix as filter_items_from_sparse_matrix +from scipy import sparse from tqdm.auto import tqdm from rectools import InternalIds @@ -43,6 +46,126 @@ class Factors: biases: tp.Optional[np.ndarray] = None +class ImplicitRanker: + """ + Ranker for DOT and COSINE similarity distance which uses implicit library matrix factorization topk method + + Parameters + ---------- + distance : Distance + Distance metric. + subjects_factors : np.ndarray + Array of subject embeddings, shape (n_subjects, n_factors). + objects_factors : np.ndarray + Array with embeddings of all objects, shape (n_objects, n_factors). + """ + + def __init__(self, distance: Distance, subjects_factors: np.ndarray, objects_factors: np.ndarray) -> None: + if distance not in (Distance.DOT, Distance.COSINE): + raise ValueError(f"ImplicitRanker is not suitable for {distance} distance") + self.distance = distance + self.subjects_factors = subjects_factors.astype(np.float32) + self.objects_factors = objects_factors.astype(np.float32) + + self.subjects_norms: np.ndarray + self.objects_norms: np.ndarray + if distance == Distance.COSINE: + self.subjects_norms = np.linalg.norm(self.subjects_factors, axis=1) + self.objects_norms = np.linalg.norm(self.objects_factors, axis=1) + self.objects_norms[self.objects_norms == 0] = 1e-10 + self.subjects_norms[self.subjects_norms == 0] = 1e-10 + + def _get_neginf_score(self) -> float: + return -np.finfo(np.float32).max + + def _get_mask_for_correct_scores(self, scores: np.ndarray) -> tp.List[bool]: + """Filter scores from implicit library that are not relevant. Implicit library assigns `neginf` score + to items that are meant to be filtered (e.g. blacklist items or already seen items) + """ + num_masked = 0 + min_score = self._get_neginf_score() + for el in np.flip(scores): + if el <= min_score: + num_masked += 1 + else: + break + return [True] * (len(scores) - num_masked) + [False] * num_masked + + def _process_implicit_scores( + self, subject_ids: np.ndarray, ids: np.ndarray, scores: np.ndarray + ) -> tp.Tuple[InternalIds, InternalIds, Scores]: + + all_target_ids = [] + all_reco_ids: tp.List[np.ndarray] = [] + all_scores: tp.List[np.ndarray] = [] + + for subject_id, object_ids, object_scores in zip(subject_ids, ids, scores): + correct_mask = self._get_mask_for_correct_scores(object_scores) + relevant_scores = object_scores[correct_mask] + relevant_ids = object_ids[correct_mask] + + if self.distance == Distance.COSINE: + subject_norm = self.subjects_norms[subject_id] + relevant_scores /= subject_norm + + all_target_ids.extend([subject_id for _ in range(len(relevant_ids))]) + all_reco_ids.append(relevant_ids) + all_scores.append(relevant_scores) + + return all_target_ids, np.concatenate(all_reco_ids), np.concatenate(all_scores) + + def rank( + self, + subject_ids: np.ndarray, + k: int, + ui_csr_for_filter: tp.Optional[sparse.csr_matrix], # only relevant for u2i recos + sorted_item_ids_to_recommend: tp.Optional[np.ndarray], # whitelist + num_threads: int = 0, + ) -> tp.Tuple[InternalIds, InternalIds, Scores]: + """Proceed inference using implicit library matrix factorization topk cpu method""" + if sorted_item_ids_to_recommend is not None: + object_factors_whitelist = self.objects_factors[sorted_item_ids_to_recommend] + + if ui_csr_for_filter is not None: + # filter ui_csr_for_filter matrix to contain only whitelist objects + filter_query_items = filter_items_from_sparse_matrix(sorted_item_ids_to_recommend, ui_csr_for_filter) + else: + filter_query_items = None + + else: + # keep all objects and full ui_csr_for_filter + object_factors_whitelist = self.objects_factors + filter_query_items = ui_csr_for_filter + + subject_factors = self.subjects_factors[subject_ids] + + object_norms = None # for DOT distance + if self.distance == Distance.COSINE: + object_norms = self.objects_norms + if sorted_item_ids_to_recommend is not None: + object_norms = object_norms[sorted_item_ids_to_recommend] + + real_k = min(k, object_factors_whitelist.shape[0]) + + ids, scores = implicit.cpu.topk.topk( # pylint: disable=c-extension-no-member + items=object_factors_whitelist, + query=subject_factors, + k=real_k, + item_norms=object_norms, # query norms for COSINE distance are applied afterwards + filter_query_items=filter_query_items, # queries x objects csr matrix for getting neginf scores + filter_items=None, # rectools doesn't support blacklist for now + num_threads=num_threads, + ) + + if sorted_item_ids_to_recommend is not None: + ids = sorted_item_ids_to_recommend[ids] + + # filter neginf from implicit scores and apply norms for correct COSINE distance + all_target_ids, all_reco_ids, all_scores = self._process_implicit_scores(subject_ids, ids, scores) + + return all_target_ids, all_reco_ids, all_scores + + class ScoreCalculator: """ Calculate proximity scores between one subject (e.g. user) and all objects (e.g. items) @@ -120,6 +243,7 @@ class VectorModel(ModelBase): u2i_dist: Distance = NotImplemented i2i_dist: Distance = NotImplemented + n_threads: int = 0 # TODO: decide how to pass it correctly for all models def _recommend_u2i( self, @@ -131,9 +255,24 @@ def _recommend_u2i( ) -> tp.Tuple[InternalIds, InternalIds, Scores]: if filter_viewed: user_items = dataset.get_user_item_matrix(include_weights=False) + else: + user_items = None + + user_vectors, item_vectors = self._get_u2i_vectors(dataset) - scores_calculator = self._get_u2i_calculator(dataset) + if self.u2i_dist in (Distance.COSINE, Distance.DOT): + ranker = ImplicitRanker(self.u2i_dist, user_vectors, item_vectors) + ui_csr_for_filter = user_items[user_ids] if filter_viewed else None + return ranker.rank( + subject_ids=user_ids, + k=k, + ui_csr_for_filter=ui_csr_for_filter, + sorted_item_ids_to_recommend=sorted_item_ids_to_recommend, + num_threads=self.n_threads, + ) + + scores_calculator = ScoreCalculator(self.u2i_dist, user_vectors, item_vectors) all_target_ids = [] all_reco_ids: tp.List[np.ndarray] = [] all_scores: tp.List[np.ndarray] = [] @@ -159,8 +298,20 @@ def _recommend_i2i( k: int, sorted_item_ids_to_recommend: tp.Optional[np.ndarray], ) -> tp.Tuple[InternalIds, InternalIds, Scores]: - scores_calculator = self._get_i2i_calculator(dataset) + item_vectors_1, item_vectors_2 = self._get_i2i_vectors(dataset) + if self.i2i_dist in (Distance.COSINE, Distance.DOT): + ranker = ImplicitRanker(self.i2i_dist, item_vectors_1, item_vectors_2) + + return ranker.rank( + subject_ids=target_ids, + k=k, + ui_csr_for_filter=None, + sorted_item_ids_to_recommend=sorted_item_ids_to_recommend, + num_threads=self.n_threads, + ) + + scores_calculator = ScoreCalculator(self.i2i_dist, item_vectors_1, item_vectors_2) all_target_ids = [] all_reco_ids: tp.List[np.ndarray] = [] all_scores: tp.List[np.ndarray] = [] @@ -179,7 +330,30 @@ def _recommend_i2i( return all_target_ids, np.concatenate(all_reco_ids), np.concatenate(all_scores) - def _get_u2i_calculator(self, dataset: Dataset) -> ScoreCalculator: + def _process_biases_to_vectors( + self, + distance: Distance, + subject_embeddings: np.ndarray, + subject_biases: np.ndarray, + object_embeddings: np.ndarray, + object_biases: np.ndarray, + ) -> tp.Tuple[np.ndarray, np.ndarray]: + # TODO: make it possible to control if add biases or not (even if they are present) + if distance == Distance.DOT: + subject_vectors = np.hstack( + (subject_biases[:, np.newaxis], np.ones((subject_biases.size, 1)), subject_embeddings) + ) + object_vectors = np.hstack( + (np.ones((object_biases.size, 1)), object_biases[:, np.newaxis], object_embeddings) + ) + elif distance in (Distance.COSINE, Distance.EUCLIDEAN): + subject_vectors = np.hstack((subject_biases[:, np.newaxis], subject_embeddings)) + object_vectors = np.hstack((object_biases[:, np.newaxis], object_embeddings)) + else: + raise ValueError(f"Unexpected distance `{distance}`") + return subject_vectors, object_vectors + + def _get_u2i_vectors(self, dataset: Dataset) -> tp.Tuple[np.ndarray, np.ndarray]: user_factors = self._get_users_factors(dataset) item_factors = self._get_items_factors(dataset) @@ -188,34 +362,25 @@ def _get_u2i_calculator(self, dataset: Dataset) -> ScoreCalculator: user_biases = user_factors.biases item_biases = item_factors.biases - # TODO: make it possible to control if add biases or not (even if they present) if user_biases is not None and item_biases is not None: - if self.u2i_dist == Distance.DOT: - user_vectors = np.hstack((user_biases[:, np.newaxis], np.ones((user_biases.size, 1)), user_vectors)) - item_vectors = np.hstack((np.ones((item_biases.size, 1)), item_biases[:, np.newaxis], item_vectors)) - elif self.u2i_dist in (Distance.COSINE, Distance.EUCLIDEAN): - user_vectors = np.hstack((user_biases[:, np.newaxis], user_vectors)) - item_vectors = np.hstack((item_biases[:, np.newaxis], item_vectors)) - else: - raise ValueError(f"Unexpected distance `{self.u2i_dist}`") + user_vectors, item_vectors = self._process_biases_to_vectors( + self.u2i_dist, user_vectors, user_biases, item_vectors, item_biases + ) - return ScoreCalculator(self.u2i_dist, user_vectors, item_vectors) + return user_vectors, item_vectors - def _get_i2i_calculator(self, dataset: Dataset) -> ScoreCalculator: + def _get_i2i_vectors(self, dataset: Dataset) -> tp.Tuple[np.ndarray, np.ndarray]: item_factors = self._get_items_factors(dataset) item_vectors = item_factors.embeddings item_biases = item_factors.biases item_vectors_1 = item_vectors_2 = item_vectors - if item_biases is not None: # TODO: make it possible to control if add biases or not (even if they present) - if self.i2i_dist == Distance.DOT: - item_vectors_1 = np.hstack((np.ones((item_biases.size, 1)), item_biases[:, np.newaxis], item_vectors)) - item_vectors_2 = np.hstack((item_biases[:, np.newaxis], np.ones((item_biases.size, 1)), item_vectors)) - elif self.i2i_dist in (Distance.COSINE, Distance.EUCLIDEAN): - item_vectors_1 = item_vectors_2 = np.hstack((item_biases[:, np.newaxis], item_vectors)) - else: - raise ValueError(f"Unexpected distance `{self.u2i_dist}`") - return ScoreCalculator(self.i2i_dist, item_vectors_1, item_vectors_2) + if item_biases is not None: + item_vectors_1, item_vectors_2 = self._process_biases_to_vectors( + self.i2i_dist, item_vectors, item_biases, item_vectors, item_biases + ) + + return item_vectors_1, item_vectors_2 def _get_users_factors(self, dataset: Dataset) -> Factors: raise NotImplementedError() diff --git a/tests/models/test_vector.py b/tests/models/test_vector.py index 7277088a..5d2d8b3e 100644 --- a/tests/models/test_vector.py +++ b/tests/models/test_vector.py @@ -14,13 +14,14 @@ import typing as tp +import implicit.cpu import numpy as np import pandas as pd import pytest from rectools import Columns from rectools.dataset import Dataset -from rectools.models.vector import Distance, Factors, ScoreCalculator, VectorModel +from rectools.models.vector import Distance, Factors, ImplicitRanker, ScoreCalculator, VectorModel T = tp.TypeVar("T") @@ -60,6 +61,54 @@ def test_scores_calculator_with_incorrect_distance() -> None: calculator.calc(1) +class TestImplicitRanker: # pylint: disable=protected-access + @pytest.fixture + def subject_factors(self) -> np.ndarray: + return np.array([[-4, 0, 3], [0, 0, 0]]) + + @pytest.fixture + def object_factors(self) -> np.ndarray: + return np.array( + [ + [-4, 0, 3], + [0, 0, 0], + [1, 1, 1], + ] + ) + + def test_neginf_score(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None: + implicit_ranker = ImplicitRanker(Distance.DOT, subjects_factors=subject_factors, objects_factors=object_factors) + dummy_factors = np.array([[1, 2]], dtype=np.float32) + neginf = implicit.cpu.topk.topk( # pylint: disable=c-extension-no-member + items=dummy_factors, + query=dummy_factors, + k=1, + filter_items=np.array([0]), + )[1][0][0] + assert neginf == implicit_ranker._get_neginf_score() + + def test_with_incorrect_distance(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None: + with pytest.raises(ValueError): + ImplicitRanker(Distance.EUCLIDEAN, subjects_factors=subject_factors, objects_factors=object_factors) + + def test_mask_for_correct_scores(self, subject_factors: np.ndarray, object_factors: np.ndarray) -> None: + implicit_ranker = ImplicitRanker(Distance.DOT, subjects_factors=subject_factors, objects_factors=object_factors) + neginf = implicit_ranker._get_neginf_score() + scores = np.array([7, 6, 0, 0], dtype=np.float32) + + actual = implicit_ranker._get_mask_for_correct_scores(scores) + assert actual == [True] * 4 + + actual = implicit_ranker._get_mask_for_correct_scores(np.append(scores, [neginf] * 2)) + assert actual == [True] * 4 + [False] * 2 + + actual = implicit_ranker._get_mask_for_correct_scores(np.append(scores, [neginf * 0.99] * 2)) + assert actual == [True] * 6 + + actual = implicit_ranker._get_mask_for_correct_scores(np.insert(scores, 0, neginf)) + assert actual == [True] * 5 + + class TestVectorModel: # pylint: disable=protected-access, attribute-defined-outside-init def setup(self) -> None: stub_interactions = pd.DataFrame([], columns=Columns.Interactions) @@ -125,7 +174,7 @@ def test_without_biases( else: # i2i _, reco, scores = model._recommend_i2i(np.array([0, 1]), self.stub_dataset, 5, None) assert list(reco) == sum(expected_reco, []) - assert list(scores) == sum(expected_scores, []) + np.testing.assert_almost_equal(scores, np.array(expected_scores).ravel(), decimal=5) @pytest.mark.parametrize( "distance,expected_reco,expected_scores", @@ -151,14 +200,14 @@ def test_with_biases( else: # i2i _, reco, scores = model._recommend_i2i(np.array([0, 1]), self.stub_dataset, 5, None) assert list(reco) == sum(expected_reco, []) - assert list(scores) == sum(expected_scores, []) + np.testing.assert_almost_equal(scores, np.array(expected_scores).ravel(), decimal=5) @pytest.mark.parametrize("method", ("u2i", "i2i")) def test_with_incorrect_distance(self, method: str) -> None: with pytest.raises(ValueError): if method == "u2i": m = self.make_model(self.user_biased_factors, self.item_biased_factors, u2i_distance=7) # type: ignore - m._get_u2i_calculator(self.stub_dataset) + m._get_u2i_vectors(self.stub_dataset) else: m = self.make_model(self.user_biased_factors, self.item_biased_factors, i2i_distance=7) # type: ignore - m._get_i2i_calculator(self.stub_dataset) + m._get_i2i_vectors(self.stub_dataset)