Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: GigaChannel Community gossip caching #4648

Merged
merged 1 commit into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions Tribler/Test/Community/gigachannel/test_community.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

from twisted.internet.defer import inlineCallbacks

from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import COMMITTED, NEW
from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import NEW
from Tribler.Core.Modules.MetadataStore.serialization import REGULAR_TORRENT
from Tribler.Core.Modules.MetadataStore.store import MetadataStore
from Tribler.Core.Utilities.random_utils import random_infohash
from Tribler.Test.Core.base_test import MockObject
Expand Down Expand Up @@ -72,9 +73,11 @@ def test_send_random_multiple_torrents(self):
"""
Test whether sending a single channel with a multiple torrents to another peer works correctly
"""
# We set gossip renewal period to 2 for this test, so the 3rd packet will renew the cache
self.nodes[0].overlay.gossip_renewal_period = 2
with db_session:
channel = self.nodes[0].overlay.metadata_store.ChannelMetadata.create_channel("test", "bla")
for _ in xrange(20):
for _ in xrange(10):
self.add_random_torrent(self.nodes[0].overlay.metadata_store.TorrentMetadata)
channel.commit_channel_torrent()

Expand All @@ -83,9 +86,40 @@ def test_send_random_multiple_torrents(self):
yield self.deliver_messages(timeout=0.5)

with db_session:
self.assertEqual(len(self.nodes[1].overlay.metadata_store.ChannelMetadata.select()), 1)
channel = self.nodes[1].overlay.metadata_store.ChannelMetadata.select()[:][0]
self.assertLess(channel.contents_len, 20)
channel = self.nodes[1].overlay.metadata_store.ChannelMetadata.get()
torrents1 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertLess(channel.contents_len, 10)
self.assertLess(0, channel.contents_len)

# We must delete the old and create all-new torrent entries for the next test.
# Otherwise, it becomes non-deterministic.
with db_session:
channel = self.nodes[0].overlay.metadata_store.ChannelMetadata.get()
self.nodes[0].overlay.metadata_store.TorrentMetadata.select(
lambda g: g.metadata_type == REGULAR_TORRENT).delete()
self.nodes[1].overlay.metadata_store.TorrentMetadata.select().delete()

for _ in xrange(10):
self.add_random_torrent(self.nodes[0].overlay.metadata_store.TorrentMetadata)
channel.commit_channel_torrent()

self.assertEqual(1, self.nodes[0].overlay.gossip_sequence_count)

# Initiate the gossip again. This time, it should be sent from the blob cache
# so the torrents on the receiving end should not change this time.
self.nodes[0].overlay.send_random_to(Peer(self.nodes[1].my_peer.public_key, self.nodes[1].endpoint.wan_address))

yield self.deliver_messages(timeout=0.5)
with db_session:
torrents2 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertEqual(len(torrents1), len(torrents2))

self.nodes[0].overlay.send_random_to(Peer(self.nodes[1].my_peer.public_key, self.nodes[1].endpoint.wan_address))

yield self.deliver_messages(timeout=0.5)
with db_session:
torrents3 = self.nodes[1].overlay.metadata_store.TorrentMetadata.select()[:]
self.assertLess(len(torrents2), len(torrents3))

@inlineCallbacks
def test_send_and_get_channel_update_back(self):
Expand Down
28 changes: 17 additions & 11 deletions Tribler/community/gigachannel/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ def __init__(self, my_peer, endpoint, network, metadata_store, notifier=None):
self.request_cache = RequestCache()
self.notifier = notifier

self.gossip_sequence_count = 0
self.gossip_blob = None
self.gossip_renewal_period = 30

@inlineCallbacks
def unload(self):
self.request_cache.clear()
Expand All @@ -67,17 +71,19 @@ def send_random_to(self, peer):
:type peer: Peer
:returns: None
"""
# Choose some random entries and try to pack them into maximum_payload_size bytes
md_list = []
with db_session:
# TODO: when the health table will be there, send popular torrents instead
channel_l = list(
self.metadata_store.ChannelMetadata.get_random_channels(1, only_subscribed=True, only_downloaded=True))
if not channel_l:
return
md_list.extend(channel_l + list(channel_l[0].get_random_torrents(max_entries - 1)))
blob = entries_to_chunk(md_list, maximum_payload_size)[0] if md_list else None
self.endpoint.send(peer.address, self.ezr_pack(self.NEWS_PUSH_MESSAGE, RawBlobPayload(blob)))
if self.gossip_blob is None or (self.gossip_sequence_count % self.gossip_renewal_period) == 0:
# Choose some random entries and try to pack them into maximum_payload_size bytes
with db_session:
# TODO: when the health table will be there, send popular torrents instead
channel_l = list(self.metadata_store.ChannelMetadata.get_random_channels(1, only_subscribed=True,
only_downloaded=True))
if not channel_l:
return
md_list = channel_l + list(channel_l[0].get_random_torrents(max_entries - 1))
self.gossip_blob = entries_to_chunk(md_list, maximum_payload_size)[0] if md_list else None

self.gossip_sequence_count += 1
self.endpoint.send(peer.address, self.ezr_pack(self.NEWS_PUSH_MESSAGE, RawBlobPayload(self.gossip_blob)))

@lazy_wrapper(RawBlobPayload)
def on_blob(self, peer, blob):
Expand Down