Skip to content

Commit

Permalink
GigaChannel Community gossip caching
Browse files Browse the repository at this point in the history
This makes GigaChannel Community cache the results of the SQL query
that selects the channel contents to gossip around. The resulting
blob is saved for reuse in the Community object. The blob will be sent
to 30 peers, and then the new query will be performed. This
significantly reduces the background DB activity.
  • Loading branch information
ichorid committed Jul 3, 2019
1 parent 01c88f2 commit 35bd4e0
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 16 deletions.
44 changes: 39 additions & 5 deletions Tribler/Test/Community/gigachannel/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

from twisted.internet.defer import inlineCallbacks

from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import COMMITTED, NEW
from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import NEW
from Tribler.Core.Modules.MetadataStore.serialization import REGULAR_TORRENT
from Tribler.Core.Modules.MetadataStore.store import MetadataStore
from Tribler.Core.Utilities.random_utils import random_infohash
from Tribler.Test.Core.base_test import MockObject
Expand Down Expand Up @@ -72,9 +73,11 @@ def test_send_random_multiple_torrents(self):
"""
Test whether sending a single channel with a multiple torrents to another peer works correctly
"""
# We set gossip renewal period to 2 for this test, so the 3rd packet will renew the cache
self.nodes[0].overlay.gossip_renewal_period = 2
with db_session:
channel = self.nodes[0].overlay.metadata_store.ChannelMetadata.create_channel("test", "bla")
for _ in xrange(20):
for _ in xrange(10):
self.add_random_torrent(self.nodes[0].overlay.metadata_store.TorrentMetadata)
channel.commit_channel_torrent()

Expand All @@ -83,9 +86,40 @@ def test_send_random_multiple_torrents(self):
yield self.deliver_messages(timeout=0.5)

with db_session:
self.assertEqual(len(self.nodes[1].overlay.metadata_store.ChannelMetadata.select()), 1)
channel = self.nodes[1].overlay.metadata_store.ChannelMetadata.select()[:][0]
self.assertLess(channel.contents_len, 20)
channel = self.nodes[1].overlay.metadata_store.ChannelMetadata.get()
torrents1 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertLess(channel.contents_len, 10)
self.assertLess(0, channel.contents_len)

# We must delete the old and create all-new torrent entries for the next test.
# Otherwise, it becomes non-deterministic.
with db_session:
channel = self.nodes[0].overlay.metadata_store.ChannelMetadata.get()
self.nodes[0].overlay.metadata_store.TorrentMetadata.select(
lambda g: g.metadata_type == REGULAR_TORRENT).delete()
self.nodes[1].overlay.metadata_store.TorrentMetadata.select().delete()

for _ in xrange(10):
self.add_random_torrent(self.nodes[0].overlay.metadata_store.TorrentMetadata)
channel.commit_channel_torrent()

self.assertEqual(1, self.nodes[0].overlay.gossip_sequence_count)

# Initiate the gossip again. This time, it should be sent from the blob cache
# so the torrents on the receiving end should not change this time.
self.nodes[0].overlay.send_random_to(Peer(self.nodes[1].my_peer.public_key, self.nodes[1].endpoint.wan_address))

yield self.deliver_messages(timeout=0.5)
with db_session:
torrents2 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertEqual(len(torrents1), len(torrents2))

self.nodes[0].overlay.send_random_to(Peer(self.nodes[1].my_peer.public_key, self.nodes[1].endpoint.wan_address))

yield self.deliver_messages(timeout=0.5)
with db_session:
torrents3 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertLess(len(torrents2), len(torrents3))

@inlineCallbacks
def test_send_and_get_channel_update_back(self):
Expand Down
28 changes: 17 additions & 11 deletions Tribler/community/gigachannel/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def __init__(self, my_peer, endpoint, network, metadata_store, notifier=None):
self.request_cache = RequestCache()
self.notifier = notifier

self.gossip_sequence_count = 0
self.gossip_blob = None
self.gossip_renewal_period = 30

@inlineCallbacks
def unload(self):
self.request_cache.clear()
Expand All @@ -67,17 +71,19 @@ def send_random_to(self, peer):
:type peer: Peer
:returns: None
"""
# Choose some random entries and try to pack them into maximum_payload_size bytes
md_list = []
with db_session:
# TODO: when the health table will be there, send popular torrents instead
channel_l = list(
self.metadata_store.ChannelMetadata.get_random_channels(1, only_subscribed=True, only_downloaded=True))
if not channel_l:
return
md_list.extend(channel_l + list(channel_l[0].get_random_torrents(max_entries - 1)))
blob = entries_to_chunk(md_list, maximum_payload_size)[0] if md_list else None
self.endpoint.send(peer.address, self.ezr_pack(self.NEWS_PUSH_MESSAGE, RawBlobPayload(blob)))
if self.gossip_blob is None or (self.gossip_sequence_count % self.gossip_renewal_period) == 0:
# Choose some random entries and try to pack them into maximum_payload_size bytes
with db_session:
# TODO: when the health table will be there, send popular torrents instead
channel_l = list(self.metadata_store.ChannelMetadata.get_random_channels(1, only_subscribed=True,
only_downloaded=True))
if not channel_l:
return
md_list = channel_l + list(channel_l[0].get_random_torrents(max_entries - 1))
self.gossip_blob = entries_to_chunk(md_list, maximum_payload_size)[0] if md_list else None

self.gossip_sequence_count += 1
self.endpoint.send(peer.address, self.ezr_pack(self.NEWS_PUSH_MESSAGE, RawBlobPayload(self.gossip_blob)))

@lazy_wrapper(RawBlobPayload)
def on_blob(self, peer, blob):
Expand Down

0 comments on commit 35bd4e0

Please sign in to comment.