Skip to content

Commit

Permalink
Store preferable infohashes for queries
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Jan 4, 2024
1 parent f17ed57 commit 2ac6e36
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,12 @@ def notify_gui(request, processing_results):
if r.obj_state == ObjState.NEW_OBJECT
]
if self.composition.notifier:
self.composition.notifier[notifications.remote_query_results](
{"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)})
self.composition.notifier[notifications.remote_query_results]({
"query": kwargs.get("txt_filter"),
"results": results,
"uuid": str(request_uuid),
"peer": hexlify(request.peer.mid)
})

peers_to_query = self.get_random_peers(self.composition.max_query_peers)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
from typing import Generator

from pony.orm import db_session
from pytest import fixture

from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer
from tribler.core.components.user_activity.types import InfoHash
from tribler.core.utilities.pony_utils import TrackedDatabase


@fixture(name="layer")
def fixture_activity_layer() -> Generator[UserActivityLayer, None, None]:
database = TrackedDatabase()
database.bind(provider="sqlite", filename=":memory:")
ual = UserActivityLayer(database)
database.generate_mapping(create_tables=True)
yield ual
database.disconnect()


def float_equals(a: float, b: float) -> bool:
return round(a, 5) == round(b, 5)


def test_store_no_losers(layer: UserActivityLayer) -> None:
"""
Test that queries can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), set())

with db_session():
queries = layer.Query.select()[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert len(queries[0].infohashes) == 1
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0)


def test_store_with_loser(layer: UserActivityLayer) -> None:
"""
Test that queries with a loser can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

with db_session():
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser.preference, 0.0)


def test_store_with_losers(layer: UserActivityLayer) -> None:
"""
Test that queries with multiple losers can be stored and retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})

with db_session():
queries = layer.Query.select()[:]
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(winner.preference, 1.0)
assert float_equals(loser_1.preference, 0.0)
assert float_equals(loser_2.preference, 0.0)
assert float_equals(loser_3.preference, 0.0)


def test_store_weighted_decay(layer: UserActivityLayer) -> None:
"""
Test result decay after updating.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.8)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)


def test_store_delete_old(layer: UserActivityLayer) -> None:
"""
Test result decay after updating.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert should_be_dropped == []
assert float_equals(entry_4.preference, 0.8)


def test_store_delete_old_over_e(layer: UserActivityLayer) -> None:
"""
Test if entries are not deleted if their preference is still over the threshold e.
"""
layer.e = 0.0
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20),
InfoHash(b"\x03" * 20)})
layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20),
InfoHash(b"\x01" * 20),
InfoHash(b"\x02" * 20)})

with db_session():
queries = layer.Query.select()[:]
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:]
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:]
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:]
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:]
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:]

assert len(queries) == 1
assert queries[0].query == "test query"
assert float_equals(entry_1.preference, 0.2)
assert float_equals(entry_2.preference, 0.0)
assert float_equals(entry_3.preference, 0.0)
assert float_equals(entry_4.preference, 0.0)
assert float_equals(entry_5.preference, 0.8)


def test_get_preferable(layer: UserActivityLayer) -> None:
"""
Test if a preferable infohash is correctly retrieved.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x00" * 20) == b"\x00" * 20


def test_get_preferable_already_best(layer: UserActivityLayer) -> None:
"""
Test if a infohash returns itself when it is preferable.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20


def test_get_preferable_unknown(layer: UserActivityLayer) -> None:
"""
Test if a infohash returns itself when it has no known preferable infohashes.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)})

assert layer.get_preferable(b"\x02" * 20) == b"\x02" * 20


def test_get_random(layer: UserActivityLayer) -> None:
"""
Test if the preferred infohash always gets returned from a random checked selection.
"""
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)})
layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), InfoHash(b"\x02" * 20)})

random_selection = layer.get_preferable_to_random(limit=1)

assert len(random_selection) == 1
assert list(random_selection)[0] == b"\x01" * 20
128 changes: 128 additions & 0 deletions src/tribler/core/components/database/db/layers/user_activity_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from __future__ import annotations

import random
import typing
from dataclasses import dataclass

from pony import orm
from pony.orm import db_session

from tribler.core.components.user_activity.types import InfoHash
from tribler.core.utilities.pony_utils import TrackedDatabase

if typing.TYPE_CHECKING:
@dataclass
class InfohashPreference:
infohash: bytes
preference: float
parent_query: Query

Check warning on line 18 in src/tribler/core/components/database/db/layers/user_activity_layer.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/database/db/layers/user_activity_layer.py#L14-L18

Added lines #L14 - L18 were not covered by tests

@dataclass
class Query:
query: str
infohashes: typing.Set[InfohashPreference]

Check warning on line 23 in src/tribler/core/components/database/db/layers/user_activity_layer.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/database/db/layers/user_activity_layer.py#L20-L23

Added lines #L20 - L23 were not covered by tests


class UserActivityLayer:

def __init__(self, database: TrackedDatabase, update_weight: float = 0.8, e: float = 0.01) -> None:
"""
Create a new User Activity scheme for a particular database.
:param database: The database to bind to.
:param update_weight: The weight of new updates.
:param e: A small value to decide near-zero preference.
"""
self.database = database

self.e = e
self.update_weight_new = update_weight
self.update_weight_old = 1 - self.update_weight_new

class Query(database.Entity):
query = orm.PrimaryKey(str)
infohashes = orm.Set("InfohashPreference")

class InfohashPreference(database.Entity):
infohash = orm.Required(bytes)
preference = orm.Required(float)
parent_query = orm.Required(Query)
orm.PrimaryKey(infohash, parent_query)

self.Query = Query
self.InfohashPreference = InfohashPreference

def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None:
"""
Store a query, its selected infohash, and the infohashes that were not downloaded.
:param query: The text that the user searched for.
:param infohash: The infohash that the user downloaded.
:param losing_infohashes: The infohashes that the user saw but ignored.
"""
# Convert "win" or "loss" to "1.0" or "0.0".
weights = {ih: 0.0 for ih in losing_infohashes}
weights[infohash] = 1.0

# Update or create a new database entry
with db_session:
existing = self.Query.get(query=query)
if existing is not None:
for old_infohash_preference in existing.infohashes:
if old_infohash_preference.infohash in weights:
new_weight = (old_infohash_preference.preference * self.update_weight_old
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new)
old_infohash_preference.preference = new_weight
else:
# This infohash did not pop up, candidate for deletion
new_weight = old_infohash_preference.preference * self.update_weight_old
if new_weight < self.e:
old_infohash_preference.delete()
else:
old_infohash_preference.preference = new_weight
if infohash in weights:
weights[infohash] = self.update_weight_new
else:
existing = self.Query(query=query, infohashes=set())

for new_infohash, weight in weights.items():
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight,
parent_query=existing))

@db_session
def _select_superior(self, infohash_preference: InfohashPreference) -> InfoHash:
"""
For a given InfohashPreference, get the preferable infohash from the parent query.
"""
all_hashes_for_query = list(infohash_preference.parent_query.infohashes)
all_hashes_for_query.sort(key=lambda x: x.preference, reverse=True)
return typing.cast(InfoHash, all_hashes_for_query[0].infohash)

def get_preferable(self, infohash: InfoHash) -> InfoHash:
"""
Given an infohash, see if we know of more preferable infohashes.
:param infohash: The infohash to find better alternatives for.
"""
with db_session:
existing = self.InfohashPreference.select(infohash=infohash)[:]

if not existing:
return infohash

return self._select_superior(random.SystemRandom().choice(existing))

def get_preferable_to_random(self, limit: int = 1) -> set[InfoHash]:
"""
Retrieve (a) random infohash(es) and then return the preferred infohash for each infohash.
This method selects up to the limit of random infohashes and then outputs the set of preferable infohashes.
This means that you may start with ``limit`` number of infohashes and worst-case, if they all share the same,
preferable infohash, end up with only one infohash as the output.
:param limit: The number of infohashes to randomly get the preferred infohash for (the output set may be less).
:returns: A set of infohashes of size 0 up to ``limit``.
"""
with db_session:
random_selection = self.InfohashPreference.select_random(limit=limit)
return {self._select_superior(ih) for ih in random_selection}
3 changes: 3 additions & 0 deletions src/tribler/core/components/database/db/tribler_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from tribler.core.components.database.db.layers.health_data_access_layer import HealthDataAccessLayer
from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer
from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer
from tribler.core.utilities.pony_utils import TrackedDatabase, db_session, get_or_create

MEMORY = ':memory:'
Expand All @@ -31,6 +32,8 @@ def __init__(self, filename: Optional[str] = None, *, create_tables: bool = True
self.TorrentHealth = self.health.TorrentHealth
self.Tracker = self.health.Tracker

self.user_activity_layer = UserActivityLayer(self.instance)

filename = filename or MEMORY
db_does_not_exist = filename == MEMORY or not os.path.isfile(filename)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from ipv8.REST.base_endpoint import HTTP_BAD_REQUEST
from ipv8.REST.schema import schema

from tribler.core import notifications
from tribler.core.components.database.category_filter.family_filter import default_xxx_filter
from tribler.core.components.database.db.layers.knowledge_data_access_layer import ResourceType
from tribler.core.components.database.db.serialization import REGULAR_TORRENT, SNIPPET
Expand Down Expand Up @@ -328,6 +330,10 @@ def search_db():
f'Main query executed in {t2 - t1:.6} seconds;\n'
f'Result constructed in {t3 - t2:.6} seconds.')

self.download_manager.notifier[notifications.local_query_results]({
"query": request.query.get("txt_filter"),
"results": list(pony_query)
})
return search_results, total, max_rowid

try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ def on_torrent_finished_alert(self, alert: lt.torrent_finished_alert):
self.update_lt_status(self.handle.status())
self.checkpoint()
downloaded = self.get_state().get_total_transferred(DOWNLOAD)
if downloaded > 0 and self.stream is not None and self.notifier is not None:
if downloaded > 0 and self.notifier is not None:
name = self.tdef.get_name_as_unicode()
infohash = self.tdef.get_infohash().hex()
self.notifier[notifications.torrent_finished](infohash=infohash, name=name, hidden=self.hidden)
Expand Down
Empty file.
7 changes: 7 additions & 0 deletions src/tribler/core/components/user_activity/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from tribler.core.config.tribler_config_section import TriblerConfigSection


class UserActivitySettings(TriblerConfigSection):
enabled: bool = False
max_query_history: int = 500
health_check_interval: float = 5.0
Empty file.
Loading

0 comments on commit 2ac6e36

Please sign in to comment.