-
Notifications
You must be signed in to change notification settings - Fork 451
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Store preferable infohashes for queries
- Loading branch information
Showing
15 changed files
with
664 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
201 changes: 201 additions & 0 deletions
201
src/tribler/core/components/database/db/layers/tests/test_user_activity_layer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
from typing import Generator | ||
|
||
from pony.orm import db_session | ||
from pytest import fixture | ||
|
||
from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer | ||
from tribler.core.components.user_activity.types import InfoHash | ||
from tribler.core.utilities.pony_utils import TrackedDatabase | ||
|
||
|
||
@fixture(name="layer") | ||
def fixture_activity_layer() -> Generator[UserActivityLayer, None, None]: | ||
database = TrackedDatabase() | ||
database.bind(provider="sqlite", filename=":memory:") | ||
ual = UserActivityLayer(database) | ||
database.generate_mapping(create_tables=True) | ||
yield ual | ||
database.disconnect() | ||
|
||
|
||
def float_equals(a: float, b: float) -> bool: | ||
return round(a, 5) == round(b, 5) | ||
|
||
|
||
def test_store_no_losers(layer: UserActivityLayer) -> None: | ||
""" | ||
Test that queries can be stored and retrieved. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), set()) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert len(queries[0].infohashes) == 1 | ||
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20 | ||
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0) | ||
|
||
|
||
def test_store_with_loser(layer: UserActivityLayer) -> None: | ||
""" | ||
Test that queries with a loser can be stored and retrieved. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(winner.preference, 1.0) | ||
assert float_equals(loser.preference, 0.0) | ||
|
||
|
||
def test_store_with_losers(layer: UserActivityLayer) -> None: | ||
""" | ||
Test that queries with multiple losers can be stored and retrieved. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(winner.preference, 1.0) | ||
assert float_equals(loser_1.preference, 0.0) | ||
assert float_equals(loser_2.preference, 0.0) | ||
assert float_equals(loser_3.preference, 0.0) | ||
|
||
|
||
def test_store_weighted_decay(layer: UserActivityLayer) -> None: | ||
""" | ||
Test result decay after updating. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) | ||
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.8) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert float_equals(entry_4.preference, 0.0) | ||
|
||
|
||
def test_store_delete_old(layer: UserActivityLayer) -> None: | ||
""" | ||
Test result decay after updating. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) | ||
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), | ||
InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20)}) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.0) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert should_be_dropped == [] | ||
assert float_equals(entry_4.preference, 0.8) | ||
|
||
|
||
def test_store_delete_old_over_e(layer: UserActivityLayer) -> None: | ||
""" | ||
Test if entries are not deleted if their preference is still over the threshold e. | ||
""" | ||
layer.e = 0.0 | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) | ||
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), | ||
InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20)}) | ||
|
||
with db_session(): | ||
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.0) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert float_equals(entry_4.preference, 0.0) | ||
assert float_equals(entry_5.preference, 0.8) | ||
|
||
|
||
def test_get_preferable(layer: UserActivityLayer) -> None: | ||
""" | ||
Test if a preferable infohash is correctly retrieved. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x00" * 20) == b"\x00" * 20 | ||
|
||
|
||
def test_get_preferable_already_best(layer: UserActivityLayer) -> None: | ||
""" | ||
Test if a infohash returns itself when it is preferable. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20 | ||
|
||
|
||
def test_get_preferable_unknown(layer: UserActivityLayer) -> None: | ||
""" | ||
Test if a infohash returns itself when it has no known preferable infohashes. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x02" * 20) == b"\x02" * 20 | ||
|
||
|
||
def test_get_random(layer: UserActivityLayer) -> None: | ||
""" | ||
Test if the preferred infohash always gets returned from a random checked selection. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)}) | ||
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), InfoHash(b"\x02" * 20)}) | ||
|
||
random_selection = layer.get_random(limit=1) | ||
|
||
assert len(random_selection) == 1 | ||
assert list(random_selection)[0] == b"\x01" * 20 |
128 changes: 128 additions & 0 deletions
128
src/tribler/core/components/database/db/layers/user_activity_layer.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
from __future__ import annotations | ||
|
||
import random | ||
import typing | ||
from dataclasses import dataclass | ||
|
||
from pony import orm | ||
from pony.orm import db_session | ||
|
||
from tribler.core.components.user_activity.types import InfoHash | ||
from tribler.core.utilities.pony_utils import TrackedDatabase | ||
|
||
if typing.TYPE_CHECKING: | ||
@dataclass | ||
class InfohashPreference: | ||
infohash: bytes | ||
preference: float | ||
parent_query: Query | ||
|
||
@dataclass | ||
class Query: | ||
query: str | ||
infohashes: typing.Set[InfohashPreference] | ||
|
||
|
||
class UserActivityLayer: | ||
|
||
def __init__(self, database: TrackedDatabase, update_weight: float = 0.8, e: float = 0.01) -> None: | ||
""" | ||
Create a new User Activity scheme for a particular database. | ||
:param database: The database to bind to. | ||
:param update_weight: The weight of new updates. | ||
:param e: A small value to decide near-zero preference. | ||
""" | ||
self.database = database | ||
|
||
self.e = e | ||
self.update_weight_new = update_weight | ||
self.update_weight_old = 1 - self.update_weight_new | ||
|
||
class Query(database.Entity): | ||
query = orm.PrimaryKey(str) | ||
infohashes = orm.Set("InfohashPreference") | ||
|
||
class InfohashPreference(database.Entity): | ||
infohash = orm.Required(bytes) | ||
preference = orm.Required(float) | ||
parent_query = orm.Required(Query) | ||
orm.PrimaryKey(infohash, parent_query) | ||
|
||
self.Query = Query | ||
self.InfohashPreference = InfohashPreference | ||
|
||
def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None: | ||
""" | ||
Store a query, its selected infohash, and the infohashes that were not downloaded. | ||
:param query: The text that the user searched for. | ||
:param infohash: The infohash that the user downloaded. | ||
:param losing_infohashes: The infohashes that the user saw but ignored. | ||
""" | ||
# Convert "win" or "loss" to "1.0" or "0.0". | ||
weights = {ih: 0.0 for ih in losing_infohashes} | ||
weights[infohash] = 1.0 | ||
|
||
# Update or create a new database entry | ||
with db_session: | ||
existing = self.Query.get(query=query) | ||
if existing is not None: | ||
for old_infohash_preference in existing.infohashes: | ||
if old_infohash_preference.infohash in weights: | ||
new_weight = (old_infohash_preference.preference * self.update_weight_old | ||
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new) | ||
old_infohash_preference.preference = new_weight | ||
else: | ||
# This infohash did not pop up, candidate for deletion | ||
new_weight = old_infohash_preference.preference * self.update_weight_old | ||
if new_weight < self.e: | ||
old_infohash_preference.delete() | ||
else: | ||
old_infohash_preference.preference = new_weight | ||
if infohash in weights: | ||
weights[infohash] = self.update_weight_new | ||
else: | ||
existing = self.Query(query=query, infohashes=set()) | ||
|
||
for new_infohash, weight in weights.items(): | ||
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, | ||
parent_query=existing)) | ||
|
||
@db_session | ||
def _select_superior(self, infohash_preference: InfohashPreference) -> InfoHash: | ||
""" | ||
For a given InfohashPreference, get the preferable infohash from the parent query. | ||
""" | ||
all_hashes_for_query = list(infohash_preference.parent_query.infohashes) | ||
all_hashes_for_query.sort(key=lambda x: x.preference, reverse=True) | ||
return typing.cast(InfoHash, all_hashes_for_query[0].infohash) | ||
|
||
def get_preferable(self, infohash: InfoHash) -> InfoHash: | ||
""" | ||
Given an infohash, see if we know of more preferable infohashes. | ||
:param infohash: The infohash to find better alternatives for. | ||
""" | ||
with db_session: | ||
existing = self.InfohashPreference.select(infohash=infohash)[:] | ||
|
||
if not existing: | ||
return infohash | ||
|
||
return self._select_superior(random.SystemRandom().choice(existing)) | ||
|
||
def get_random(self, limit: int = 1) -> set[InfoHash]: | ||
""" | ||
Retrieve a random infohashes and then return the preferred infohash for each infohash. | ||
This method selects up to the limit of random infohashes and then outputs the set of preferable infohashes. | ||
This means that you may start with ``limit`` number of infohashes and worst-case, if they all share the same, | ||
preferable infohash, end up with only one infohash as the output. | ||
:param limit: The number of infohashes to randomly get the preferred infohash for (the output set may be less). | ||
:returns: A set of infohashes of size 0 up to ``limit``. | ||
""" | ||
with db_session: | ||
random_selection = self.InfohashPreference.select_random(limit=limit) | ||
return {self._select_superior(ih) for ih in random_selection} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from tribler.core.config.tribler_config_section import TriblerConfigSection | ||
|
||
|
||
class UserActivitySettings(TriblerConfigSection): | ||
enabled: bool = True | ||
max_query_history: int = 500 | ||
health_check_interval: float = 5.0 |
Empty file.
Oops, something went wrong.