-
Notifications
You must be signed in to change notification settings - Fork 451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Store preferable infohashes for queries #7786
Conversation
f794284
to
78b304f
Compare
with db_session(): | ||
queries = layer.Query.select()[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert len(queries[0].infohashes) == 1 | ||
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20 | ||
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT Same logic, but a bit more correct and easier to read.
The suggested version uses Query.get()
instead of Query.select()
because there should only be a single entity in the database, which you assert later. It also confines the with
block to only include lines that actually need the db_session
. The suggested version avoids multiple, yet identical, conversions to a list and retrieving the first element of the list.
with db_session(): | |
queries = layer.Query.select()[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert len(queries[0].infohashes) == 1 | |
assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20 | |
assert float_equals(list(queries[0].infohashes)[0].preference, 1.0) | |
with db_session(): | |
test_query = layer.Query.get() | |
infohashes = list(test_query.infohashes) | |
assert test_query.query == "test query" | |
assert len(infohashes) == 1 | |
infohash = infohashes.pop() | |
assert infohash.infohash == b"\x00" * 20 | |
assert float_equals(infohash.preference, 1.0) |
queries = layer.Query.select()[:] | ||
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(winner.preference, 1.0) | ||
assert float_equals(loser.preference, 0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: the version avoids unnecessary select
querying and list copying:
queries = layer.Query.select()[:] | |
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | |
loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert float_equals(winner.preference, 1.0) | |
assert float_equals(loser.preference, 0.0) | |
test_query = layer.Query.get() | |
winner = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20) | |
loser = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20) | |
assert float_equals(winner.preference, 1.0) | |
assert float_equals(loser.preference, 0.0) | |
assert test_query.query == "test query" |
""" | ||
Test that queries with a loser can be stored and retrieved. | ||
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the text of the test, it is not clear why one infohash is called "winner" and another is called "loser". I'm not questioning the naming here (which I will do in the class UserActivityLayer
).
Here, I suggest helping the reader by showing that the "loser" is just an infohash that passes as the third function parameter, and in the with
statement, you retrieve this infohash.
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | |
layer.store("test query", infohash=InfoHash(b"\x00" * 20), losing_infohashes={InfoHash(b"\x01" * 20)}) |
queries = layer.Query.select()[:] | ||
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(winner.preference, 1.0) | ||
assert float_equals(loser_1.preference, 0.0) | ||
assert float_equals(loser_2.preference, 0.0) | ||
assert float_equals(loser_3.preference, 0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: More compact yet correct version of the same logic. It uses select
where it is intended to query a list and get
when it is supposed to be just a single entity.
queries = layer.Query.select()[:] | |
winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | |
loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | |
loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | |
loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert float_equals(winner.preference, 1.0) | |
assert float_equals(loser_1.preference, 0.0) | |
assert float_equals(loser_2.preference, 0.0) | |
assert float_equals(loser_3.preference, 0.0) | |
test_query = layer.Query.get() | |
winner = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20) | |
losers = list(layer.InfohashPreference.select(lambda x: x.infohash != b"\x00" * 20)) | |
assert test_query.query == "test query" | |
assert float_equals(winner.preference, 1.0) | |
assert len(losers) == 3 | |
assert all(float_equals(ls.preference, 0.0) for ls in losers) |
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | ||
InfoHash(b"\x02" * 20), | ||
InfoHash(b"\x03" * 20)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: It is easier to read the test function if you specify the argument names:
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), | |
InfoHash(b"\x02" * 20), | |
InfoHash(b"\x03" * 20)}) | |
layer.store("test query", infohash=InfoHash(b"\x00" * 20), losing_infohashes={InfoHash(b"\x01" * 20), | |
InfoHash(b"\x02" * 20), | |
InfoHash(b"\x03" * 20)}) |
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.8) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert float_equals(entry_4.preference, 0.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: the version avoids unnecessary select querying and list copying:
queries = layer.Query.select()[:] | |
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | |
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | |
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | |
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.8) | |
assert float_equals(entry_3.preference, 0.0) | |
assert float_equals(entry_4.preference, 0.0) | |
test_query = layer.Query.get() | |
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20) | |
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20) | |
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20) | |
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20) | |
assert test_query.query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.8) | |
assert float_equals(entry_3.preference, 0.0) | |
assert float_equals(entry_4.preference, 0.0) |
|
||
def test_store_delete_old(layer: UserActivityLayer) -> None: | ||
""" | ||
Test result decay after updating. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description of the test likely suffers from missing specifications about the difference between it and the previous test.
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.0) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert should_be_dropped == [] | ||
assert float_equals(entry_4.preference, 0.8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: the version avoids unnecessary select querying and list copying:
queries = layer.Query.select()[:] | |
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | |
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | |
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | |
should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | |
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.0) | |
assert float_equals(entry_3.preference, 0.0) | |
assert should_be_dropped == [] | |
assert float_equals(entry_4.preference, 0.8) | |
test_query = layer.Query.get() | |
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20) | |
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20) | |
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20) | |
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x04" * 20) | |
should_be_dropped = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20) | |
assert test_query.query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.0) | |
assert float_equals(entry_3.preference, 0.0) | |
assert float_equals(entry_4.preference, 0.8) | |
assert not should_be_dropped |
queries = layer.Query.select()[:] | ||
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | ||
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | ||
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | ||
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | ||
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | ||
|
||
assert len(queries) == 1 | ||
assert queries[0].query == "test query" | ||
assert float_equals(entry_1.preference, 0.2) | ||
assert float_equals(entry_2.preference, 0.0) | ||
assert float_equals(entry_3.preference, 0.0) | ||
assert float_equals(entry_4.preference, 0.0) | ||
assert float_equals(entry_5.preference, 0.8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: the version avoids unnecessary select querying and list copying:
queries = layer.Query.select()[:] | |
entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] | |
entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] | |
entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] | |
entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] | |
entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] | |
assert len(queries) == 1 | |
assert queries[0].query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.0) | |
assert float_equals(entry_3.preference, 0.0) | |
assert float_equals(entry_4.preference, 0.0) | |
assert float_equals(entry_5.preference, 0.8) | |
test_query = layer.Query.get() | |
entry_1 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x00" * 20) | |
entry_2 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x01" * 20) | |
entry_3 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x02" * 20) | |
entry_4 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x03" * 20) | |
entry_5 = layer.InfohashPreference.get(lambda x: x.infohash == b"\x04" * 20) | |
assert test_query.query == "test query" | |
assert float_equals(entry_1.preference, 0.2) | |
assert float_equals(entry_2.preference, 0.0) | |
assert float_equals(entry_3.preference, 0.0) | |
assert float_equals(entry_4.preference, 0.0) | |
assert float_equals(entry_5.preference, 0.8) |
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x00" * 20) == b"\x00" * 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"
""" | ||
layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) | ||
|
||
assert layer.get_preferable(b"\x02" * 20) == b"\x02" * 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a type mismatch: "Expected type 'InfoHash', got 'bytes' instead"
random_selection = layer.get_random(limit=1) | ||
|
||
assert len(random_selection) == 1 | ||
assert list(random_selection)[0] == b"\x01" * 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unnecessary conversion to a list and retrieval of the first item:
assert list(random_selection)[0] == b"\x01" * 20 | |
assert random_selection.pop() == b"\x01" * 20 |
|
||
def float_equals(a: float, b: float) -> bool: | ||
return round(a, 5) == round(b, 5) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, you are using the same construction over and over again:
def test_example(layer: UserActivityLayer) -> None:
layer.store("test query", InfoHash(b"\x00" * 20), set())
with db_session():
queries = layer.Query.select()[:]
assert queries[0].query == "test query"
assert len(queries[0].infohashes) == 1
...
This is basically a single test case that repeated in test_store_no_losers
, test_store_with_loser
, test_store_weighted_decay
, test_store_delete_old
, test_store_delete_old_over_e
.
I would suggest that you extract it to a separate test. Then there will be no need to repeat this in other tests, and they will more accurately describe the specific test case they are testing, without any excess.
Like:
def test_store_query(layer: UserActivityLayer) -> None:
layer.store("test query", InfoHash(b''), set())
with db_session():
test_query = layer.Query.get()
assert test_query.query == "test query"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR appears to be a nice feature that looks promising for Tribler.
It is an interesting concept that appears quite similar to ClickLog. If that's the case, then it would be beneficial to add links to the ClickLog documentation as a reference in the newly added classes and components.
I've suggested a few code improvements and raised some points for discussion.
Also, I'm adding @kozlovsky as a reviewer since there is a new database structure implementation involved.
if typing.TYPE_CHECKING: | ||
@dataclass | ||
class InfohashPreference: | ||
infohash: bytes | ||
preference: float | ||
parent_query: Query | ||
|
||
@dataclass | ||
class Query: | ||
query: str | ||
infohashes: typing.Set[InfohashPreference] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These structures are used solely in the _select_superior
method, and there is no direct transformation into this datatype in the calling code, as they merely replicate the existing structures described in UserActivityLayer
. Adopting this approach of duplicating definitions necessitates updating the structures twice (once for the original and again for the duplicate), which increases the risk of errors during future updates. The developer responsible for this task should:
- Be aware that there are two definitions that require changes.
- Make changes twice, which is more error-prone than making a change once.
Additionally, for a developer who is not the owner of the code, this duplication might not be apparent (as we generally aim for code to be less, rather than more, duplicative), and it may not be obvious that it is necessary to locate another data structure.
My suggestion is to avoid duplication by refactoring the existing code. There are several methods to achieve the same class behavior but without duplication.
self.Query = Query | ||
self.InfohashPreference = InfohashPreference | ||
|
||
def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a point for discussion regarding this function interface. It is the naming.
You're using a "win-lose" representation which I find misleading, as it suggests a game-like process of identifying winners and losers. However, according to your function description, it's not about winning and losing, but rather about determining which infohashes were used (downloaded) and which weren't used (not downloaded).
I suggest reconsidering the naming to choose a more appropriate representation.
database_component = await self.require_component(DatabaseComponent) # local_query_results notification | ||
torrent_checker_component = await self.require_component(TorrentCheckerComponent) | ||
|
||
self.database_manager: UserActivityLayer = database_component.db.user_activity_layer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name self.database_manager
is misleading as it is not a manager but a layer:
self.database_manager: UserActivityLayer = database_component.db.user_activity_layer | |
self.user_activity_layer: UserActivityLayer = database_component.db.user_activity_layer |
# Update or create a new database entry | ||
with db_session: | ||
existing = self.Query.get(query=query) | ||
if existing is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"flat is better than nested" regarding to the Zen of Python. To decrease nesting in your code you can simply use get_or_create
function from pony_utils
:
with db_session:
existing = get_or_create(self.Query, query=query)
for old_infohash_preference in existing.infohashes:
...
if existing.infohashes and infohash in weights:
weights[infohash] = self.update_weight_new
Next nesting level could be removed by using this trick:
with db_session:
existing = get_or_create(self.Query, query=query)
known_infohashes = (i for i in existing.infohashes if i.infohash in weights)
unknown_infohashes = (i for i in existing.infohashes if i.infohash not in weights)
for old_infohash_preference in unknown_infohashes:
...
for old_infohash_preference in known_infohashes:
...
Also, "readability counts" and "sparse is better than dense." Therefore, two-line formulas could be rewritten as follows:
for infohash_preference in known_infohashes:
weight = weights.pop(infohash_preference.infohash)
new_weight = infohash_preference.preference * self.update_weight_old + weight * self.update_weight_new
infohash_preference.preference = new_weight
Therefore, we can significantly simplify the code while retaining the same logic.
I'll add the assembled example with all improvements as a code suggestion.
with db_session: | ||
existing = self.Query.get(query=query) | ||
if existing is not None: | ||
for old_infohash_preference in existing.infohashes: | ||
if old_infohash_preference.infohash in weights: | ||
new_weight = (old_infohash_preference.preference * self.update_weight_old | ||
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new) | ||
old_infohash_preference.preference = new_weight | ||
else: | ||
# This infohash did not pop up, candidate for deletion | ||
new_weight = old_infohash_preference.preference * self.update_weight_old | ||
if new_weight < self.e: | ||
old_infohash_preference.delete() | ||
else: | ||
old_infohash_preference.preference = new_weight | ||
if infohash in weights: | ||
weights[infohash] = self.update_weight_new | ||
else: | ||
existing = self.Query(query=query, infohashes=set()) | ||
|
||
for new_infohash, weight in weights.items(): | ||
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, | ||
parent_query=existing)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplified code block:
with db_session: | |
existing = self.Query.get(query=query) | |
if existing is not None: | |
for old_infohash_preference in existing.infohashes: | |
if old_infohash_preference.infohash in weights: | |
new_weight = (old_infohash_preference.preference * self.update_weight_old | |
+ weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new) | |
old_infohash_preference.preference = new_weight | |
else: | |
# This infohash did not pop up, candidate for deletion | |
new_weight = old_infohash_preference.preference * self.update_weight_old | |
if new_weight < self.e: | |
old_infohash_preference.delete() | |
else: | |
old_infohash_preference.preference = new_weight | |
if infohash in weights: | |
weights[infohash] = self.update_weight_new | |
else: | |
existing = self.Query(query=query, infohashes=set()) | |
for new_infohash, weight in weights.items(): | |
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, | |
parent_query=existing)) | |
with db_session: | |
existing = get_or_create(self.Query, query=query) | |
related_infohashes = (i for i in existing.infohashes if i.infohash in weights) | |
unrelated_infohashes = (i for i in existing.infohashes if i.infohash not in weights) | |
for infohash_preference in unrelated_infohashes: | |
# This infohash did not pop up, candidate for deletion | |
new_weight = infohash_preference.preference * self.update_weight_old | |
if new_weight < self.e: | |
infohash_preference.delete() | |
else: | |
infohash_preference.preference = new_weight | |
for infohash_preference in related_infohashes: | |
weight = weights.pop(infohash_preference.infohash) | |
new_weight = infohash_preference.preference * self.update_weight_old + weight * self.update_weight_new | |
infohash_preference.preference = new_weight | |
if existing.infohashes and infohash in weights: | |
weights[infohash] = self.update_weight_new | |
for new_infohash, weight in weights.items(): | |
existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, | |
parent_query=existing)) |
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter | ||
|
||
|
||
class UserActivityComponent(Component): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: From a design perspective, it would be beneficial to keep the UserActivityComponent
more declarative at a high level and minimize specific implementation details by extracting them into a separate class (for example, PreferableChecker
(I don't like the name, it is just an example)). Then the Component code will look neater, and it will be independent of PreferableChecker
implementation changes.
This approach would make it easier to write and conduct tests separately for the UserActivityComponent
(to test its composition) and the PreferableChecker
(to test its logic).
class UserActivityComponent(Component):
preferable_checker = None
async def run(self) -> None:
await super().run()
# Wait for dependencies
await self.require_component(ContentDiscoveryComponent) # remote_query_results notification
await self.require_component(LibtorrentComponent) # torrent_finished notification
database_component = await self.require_component(DatabaseComponent) # local_query_results notification
torrent_checker_component = await self.require_component(TorrentCheckerComponent)
self.preferable_checker = PreferableChecker(
max_query_history=self.session.config.user_activity.max_query_history,
torrent_checker=torrent_checker_component.torrent_checker,
user_activity_layer=database_component.db.user_activity_layer
)
async def shutdown(self) -> None:
await super().shutdown()
if self.preferable_checker:
self.preferable_checker.shutdown()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I had this in my original design as well (#7632 (comment)) However, the impelementation was so small and trivial that I moved the code here. Note that this code is not any less testable due to this (100% coverage).
class Query(database.Entity): | ||
query = orm.PrimaryKey(str) | ||
infohashes = orm.Set("InfohashPreference") | ||
|
||
class InfohashPreference(database.Entity): | ||
infohash = orm.Required(bytes) | ||
preference = orm.Required(float) | ||
parent_query = orm.Required(Query) | ||
orm.PrimaryKey(infohash, parent_query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database does not appear to be normalized. As @kozlovsky is coming back from vacation at the same time as you, I summon him to review the database structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the database structure looks normalized. It is possible to link InfohashPreference
with the Resource
entity, but it is actually not necessary and complicates the database schema a bit.
But I'd like to use integer fields instead of floats, like:
class Query(database.Entity):
query = orm.PrimaryKey(str)
searched_counter = orm.Required(int, default=1)
infohashes = orm.Set("InfohashPreference")
class InfohashPreference(database.Entity):
infohash = orm.Required(bytes)
parent_query = orm.Required(Query)
chosen_counter = orm.Required(int, default=0)
ignored_counter = orm.Required(int, default=0)
orm.PrimaryKey(infohash, parent_query)
This way, changing the formula on how preference
is calculated becomes possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some additional thought, I agree with @drew2a that it may be better to link the InfohashPreference
entity with the Resource
entity in the same way as the Tracker
entity of the HealthDataAccessLayer
is linked with the Resource
entity via the Tracker.torrents
/ Resource.trackers
relationships.
It has the following benefits:
- The primary key of
InfohashPreference
becomes shorter, as it now will use resource id instead of infohash bytes. - With the current approach of
TriblerDatabase
, the torrent metadata is just a kind ofResource
, and havingResource
directly linked withInfohashPreference
can simplify some future queries.
The drawback is when we want to search InfohashPreference
knowing the specific info hash, we first need to find the Resource
and then use it instead of the info hash value.
With this change, the code will be like:
class UserActivityLayer:
def __init__(self, knowledge_layer: KnowledgeDataAccessLayer) -> None:
self.database = knowledge_layer.instance
self.Resource = knowledge_layer.Resource
class Query(self.database.Entity):
query = orm.PrimaryKey(str)
searched_counter = orm.Required(int, default=1)
infohashes = orm.Set("InfohashPreference")
class InfohashPreference(self.database.Entity):
torrent = orm.Required(self.Resource)
query = orm.Required(Query)
chosen_counter = orm.Required(int, default=0)
ignored_counter = orm.Required(int, default=0)
orm.PrimaryKey(torrent, query)
self.Query = Query
self.UserPreference = InfohashPreference
(In this example, I renamed parent_query
to query
, as parent_
prefix looks unnecessary)
And, in the Resource
entity of the KnowledgeDataAccessLayer
we will have:
infohash_preferences = orm.Set(lambda: db.InfohashPreference, reverse="torrent")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how you could implement decay of previous search and results for the same (infohash, query) with this database format. Is it still possible? Because that is a requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, it is not enough to implement the proper decay; I missed that requirement. But it leads me to some additional thoughts.
The current scheme implemented in this PR is single-user. I don't think decay is important when the statistics are accumulated only for a single user. But if we aggregate anonymized query-result-preference statistics from thousands of users, the decay indeed makes sense.
But then we have a new question on spreading and accumulating these statistics. It probably should be signed by a second key when gossiping to prevent spam. However, we cannot sign the dynamically decaying preference
value of the float
type. We can sign some discrete information that at the moment T, an anonymous user U performed the query Q and clicked on the infohash H.
So, if the goal is to aggregate decaying anonymous user-clicks-at-query-results statistics across multiple users, the discrete signed piece of information should probably be (T, U, Q, H). Then, the decay can be implemented by taking the time stamps into account - the weight of the user's "vote" can be inversely proportional to the vote's age.
In that case, the entity attributes might be something like:
class InfohashPreference(self.database.Entity):
user = orm.Required(User) # some new entity
query = orm.Required(Query)
torrent = orm.Required(self.Resource)
last_clicked_at = orm.Required(datetime)
signature = orm.Optional(bytes) # for remotely received gossips
# for the next field see https://github.com/Tribler/tribler/pull/7786#discussion_r1439578570
successfully_downloaded = orm.Required(bool, default=False) # to ignore local unfinished downloads
orm.PrimaryKey(user, query, torrent)
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is close to what I had in mind for the long term, in a different PR. I would prefer we discuss the grand design in the linked issue, not on this PR.
Just to touch on it, in short, the plan for now is to use emergent behavior, as follows:
- Torrents that are "preferred" have their health checked more frequently locally (this PR).
- Torrents that have their health checked recently (locally) are more frequently gossiped in the popularity community (already exists).
- Torrents that are gossiped more by others have a higher chance of appearing in search results remotely (already exists).
- Effect: search results that are gossiped to more users are more likely to be downloaded. For actually popular content, that is downloaded, this forms a feedback loop: back to step 2.
In summary, this PR creates a soft bias and, therefore, an emergent effect that boosts the popularity of content that is searched for and downloaded.
Establishing shadow identities and more aggressively gossiping - while preventing spam - is something I'll leave for a follow-up PR. Ideally, we don't need to gossip preference directly and we can somehow merge gossiped ML models. However, this should only be implemented after careful experimentation. For now, this PR gives each user a local ranking to start the ML experimentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation; now I understand your approach better. Initially, I was misguided by the picture in #7632 with a "store signed" label, as my understanding was that it is only possible to sign discrete facts, not float values. If gossiping about individual provable facts is not the intention, then storing calculated preferences is probably fine.
Still, you can consider using torrent = orm.Required(self.Resource)
instead of infohash = orm.Required(bytes)
in the InfohashPreference
entity to reduce the data size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, thank you for the suggestion. Once the initial prototype has been merged, we can look at refactoring and optimizations and I will definitely keep your suggestion in mind. That said, once this has been merged, this PR is also no longer my sole responsibility and others can also contribute their excellent suggestions to the communal code. We can grow the code over time.
I do realize, now, that I left enabled = True
as the default setting. I'll make sure to keep this disabled by default so we still have the freedom to change things like the database format in future PRs.
self.infohash_to_queries[infohash].append(query) | ||
self.queries[query] = results | self.queries.get(query, set()) | ||
|
||
if len(self.queries) > self.max_query_history: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps for this purpose (to store a limited amount of data), you could use a dedicated data structure. This would make it possible to better cover it with tests and to use it in other parts of Tribler's code. As a beneficial side effect, it would be easier to understand the logic of the dedicated data structure and of the current method.
We can extend this draft:
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, size_limit=None, **kwargs):
super().__init__(*args, **kwargs)
self.size_limit = size_limit
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is None:
return
while len(self) > self.size_limit:
self.popitem(last=False)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can implement such a feature once we have a need for it. However, that is not the feature this PR is implementing and it is therefore best left to another PR.
b_infohash = InfoHash(unhexlify(infohash)) | ||
queries = self.infohash_to_queries[b_infohash] | ||
for query in queries: | ||
losing_infohashes = self.queries[query] - {b_infohash} | ||
fut = get_running_loop().run_in_executor(None, | ||
self.database_manager.store, | ||
query, b_infohash, losing_infohashes) | ||
self.task_manager.register_task("Store query", functools.partial(UserActivityComponent._fut_to_task, fut)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two points:
- This method seems overly complicated for its simple purpose — to call the
self.database_manager.store
function. - I'm uncertain if the logic will work correctly in cases where the user downloads two or more torrents from the query results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is solving a real issue: it is related to #4320 and #7784.
IIRC, the observation that we should not block the main asyncio
thread with db operations should be accredited to @egbertbouman but I cannot find his original comment for the life of me. Once I find it, I'll properly link it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did manage to shrink this a little bit (see conversation with @kozlovsky).
for infohash in random_infohashes: | ||
self.task_manager.register_anonymous_task("Check preferable torrent", | ||
self.torrent_checker.check_torrent_health, infohash) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a simplified version: This version avoids a for loop for a set of just a single item and avoids an unnecessary intermediate call for register_anonymous_task
.
If you really want queuing for check_torrent_health
, it would be better to implement it on the check_torrent_health
side.
for infohash in random_infohashes: | |
self.task_manager.register_anonymous_task("Check preferable torrent", | |
self.torrent_checker.check_torrent_health, infohash) | |
if not random_infohashes: | |
return | |
infohash = random_infohashes.pop() | |
await self.torrent_checker.check_torrent_health(infohash) |
As a side note, if you never use get_random
with a limit different from 1 (excluding tests), then it is probably a sign that the function should not return a set but a single item. See: YAGNI on Wikipedia.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of this PR! It is crucial for Tribler to understand which torrent users prefer for a specific query because it is tough to properly rank torrents for a particular query without anonymized user feedback.
The system that analyzes user preferences and shares the anonymized aggregated results looks to me like a cornerstone of a future Tribler's search system.
Initially, I thought we needed to gather that information on the UI side, but this PR shows it is possible to do it entirely on the Core side, which brings simplicity.
With this, it may be hard to develop a good algorithm on the first try, and I believe the specific details of how calculations are performed can be changed in the future. Therefore, storing integer values instead of floats and calculating the resulting rank on the fly may be better, as it allows changing the formula in the future.
I mean, it is possible to keep the number of times it was searched for each Query and for each info hash that appeared in the query results to keep the number of times it was chosen or ignored. This information should be enough to calculate the rank and allow for changing the ranking formula in the future without the data being lost.
So what this PR literally does? Right now, it just adds repeated health checks for torrents that the user downloaded. This does not look very useful and can be implemented much more simply; we can have a table with a list of locally downloaded info hashes and randomly check the health of its items. With the current PopularityCommunity protocol, it will not help much, and if this is the end goal of the current PR, then a significant part of it is unnecessary. But I believe that in the future, we should spread not only info hash health info but also should gossip anonymized user preferences for query results (in the form "for query Q, the info hash H with title T was chosen by someone"), and this PR lays down a foundation for this.
class Query(database.Entity): | ||
query = orm.PrimaryKey(str) | ||
infohashes = orm.Set("InfohashPreference") | ||
|
||
class InfohashPreference(database.Entity): | ||
infohash = orm.Required(bytes) | ||
preference = orm.Required(float) | ||
parent_query = orm.Required(Query) | ||
orm.PrimaryKey(infohash, parent_query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, the database structure looks normalized. It is possible to link InfohashPreference
with the Resource
entity, but it is actually not necessary and complicates the database schema a bit.
But I'd like to use integer fields instead of floats, like:
class Query(database.Entity):
query = orm.PrimaryKey(str)
searched_counter = orm.Required(int, default=1)
infohashes = orm.Set("InfohashPreference")
class InfohashPreference(database.Entity):
infohash = orm.Required(bytes)
parent_query = orm.Required(Query)
chosen_counter = orm.Required(int, default=0)
ignored_counter = orm.Required(int, default=0)
orm.PrimaryKey(infohash, parent_query)
This way, changing the formula on how preference
is calculated becomes possible.
super().__init__(reporter) | ||
|
||
self.infohash_to_queries: dict[InfoHash, list[str]] = defaultdict(list) | ||
self.queries: OrderedDict[str, typing.Set[InfoHash]] = OrderedDict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current implementation, the self.queries
dict is stored in memory. That means (if I understand correctly) that if a user performs the search, starts the download, closes Tribler, and starts it again, the finished torrent will not be matched with the corresponding search query. It looks more correct if, upon torrent completion, we get queries from the database and do not store them in the memory in a separate dictionary. Another approach is to pre-load the dictionary at the Tribler startup, but it probably overcomplicates the code compared to just storing objects in the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. By design, I only consider downloads in the current session.
I'm also not sure if it makes sense to change this. For example, something a user searched for in a previous Tribler session could be 3 months ago. There would have to be some timeout/validity system etc. etc. We can add that at some point but I don't think this should be in this first prototype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I just wrote here, I think that we should add the timestamp field to the InfohashPreference
. Then, we can update this field in the database if the user clicks again at the same infohash.
In that case, the pure-db implementation looks relatively straightforward. When the user clicks on the search result, we add the InfohashPreference
object with the current timestamp to the database and start the download. When the download is finished, we mark the corresponding InfohashPreference
as successful. We are gossiping only successfully downloaded InfohashPreference
objects. Still, the timestamp is recorded at the time when the user clicks on the torrent and not at the time when the download is complete. We update the timestamp if the user clicks again at the same torrent in the new query with the same query string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I do not doubt the feasibility of adding this. My main concern is this:
I don't think this should be in this first prototype.
We can add features - like you suggested - later. This is not the goal of this initial prototype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I agree that we can switch to the pure-db implementation later if necessary. Let's hope Tribler restarts will not significantly skew the preference statistics about big torrents.
src/tribler/core/components/user_activity/user_activity_component.py
Outdated
Show resolved
Hide resolved
src/tribler/core/components/user_activity/user_activity_component.py
Outdated
Show resolved
Hide resolved
7186d73
to
8049b92
Compare
This PR is a great example of how we want to work. I'm split between:
PR_discussion_21-pages.pdf. These 663 of extra lines of code produced 21 pages of discussion on Github. Frankly I'm getting a bit desperate that Tribler is not going anywhere. Trying to make it too perfect is not good, paradoxically. After 18 years, 9 months we still don't have anything that users actually want (caveat: without doing almost any professional marketing). Most code does not last more then a few years anyways. So let's not make it too perfect and polished. Just to clarify, obviously it is critical that the code does not have nasty bugs. How much time should we spend on reviews? Say 20% of the original PR engineering effort? So let's focus the effort of code review on identification of actual faults. or is the 'perfect is the enemy of good' incorrect here? |
8049b92
to
2ac6e36
Compare
It does not make sense to work on this PR until #7799 is resolved. If it is unclear where protype code should go, it should not go anywhere. I'll close this PR for now because it falsely suggests that it is a work in progress. If the team does decide that prototype code should live in Tribler, we can reopen this PR. |
Approach to resolve this. First get operational Gumby experiment of this feature. |
Related to #7632
This PR adds integrations with local and remote search results and stores the preferable torrent (according to what the user clicked and downloaded).
Currently, this integration is only local and the propagation of preferable torrents depends on the emergent effect of them being checked more often in the torrent checker, causing them to be propagated in the content discovery community more. In a future PR, we can implement more direct interaction with other peers to share recommendations.
The uncovered lines of this PR are in a
TYPE_CHECKING
block and, therefore, impossible to cover.