From 5ab0ac593a2e9f8aea4bb12eca34be9ae38cfe87 Mon Sep 17 00:00:00 2001 From: qstokkink Date: Sun, 4 Mar 2018 10:13:23 +0100 Subject: [PATCH] Added AllChannel 2.0 overlay class --- .../Core/Libtorrent/LibtorrentDownloadImpl.py | 3 + Tribler/community/allchannel2/community.py | 233 ++++++++++++++++++ Tribler/community/allchannel2/payload.py | 16 ++ 3 files changed, 252 insertions(+) create mode 100644 Tribler/community/allchannel2/community.py create mode 100644 Tribler/community/allchannel2/payload.py diff --git a/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py b/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py index 3ba9b5474db..c70a788e116 100644 --- a/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py +++ b/Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py @@ -160,6 +160,7 @@ def __init__(self, session, tdef): self.deferreds_resume = [] self.deferreds_handle = [] self.deferred_removed = Deferred() + self.deferred_finished = Deferred() self.handle_check_lc = self.register_task("handle_check", LoopingCall(self.check_handle)) @@ -687,6 +688,8 @@ def reset_priorities(): if self.endbuffsize: self.set_byte_priority([(self.get_vod_fileindex(), 0, -1)], 1) self.endbuffsize = 0 + if not self.deferred_finished.called: + self.deferred_finished.callback(self) def update_lt_stats(self): """ Update libtorrent stats and check if the download should be stopped.""" diff --git a/Tribler/community/allchannel2/community.py b/Tribler/community/allchannel2/community.py new file mode 100644 index 00000000000..ffe5a9a5b94 --- /dev/null +++ b/Tribler/community/allchannel2/community.py @@ -0,0 +1,233 @@ +import os + +from Tribler.community.allchannel2.payload import ChannelPayload +from Tribler.community.allchannel2.structures import Channel +from Tribler.Core.DownloadConfig import DefaultDownloadStartupConfig +from Tribler.pyipv8.ipv8.deprecated.community import Community +from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload +from Tribler.pyipv8.ipv8.peer import Peer + + +def infohash_to_magnet(infohash): + """ + Tranform an info hash to a magnet link. + + :param infohash: the infohash to convert + :return: the magnet link belonging to this info hash + """ + return "magnet:?xt=urn:btih:" + infohash + + +class AllChannel2Community(Community): + """ + AllChannel 2.0 + + My channel administration usage: + 1. __init__() + 2. load_channels(): performs heavy disk I/O + 3. add_magnetlink()/remove_magnetlink(): change the contents of my channel + 4. fetch and seed my_channel_torrent (automatically committed) + """ + master_peer = Peer(("3081b00201010434005a4c1d05418be2acbb54abbcfee354c80d3c3a5c6c4ab176041b119de6af" + + "6c4f4113983cad1c3502d871c0db48de948425c0b5a00706052b81040024a16c036a000400c7d2" + + "044d0beb3d2bbbb78a596f15f4934b21d405cc8e0df16b2a1373165c3f6577d10ff0397e713a35" + + "ddb9ec8d177013cc9b9401f4464ca28137dedbd6967996c936a60281eab5298229af26dd47f3a0" + + "a9fb398dbeb8ba284861ad820d6366d931ea194f308d9f").decode('hex')) + + def __init__(self, my_peer, endpoint, network, working_directory="./channels", tribler_session=None): + """ + Initialize the AllChannel2 Community. + + :param my_peer: the Peer representing my peer + :param endpoint: the Endpoint object to use + :param network: the Network object to use + :param working_directory: the folder where all of the channels are stored + """ + super(AllChannel2Community, self).__init__(my_peer, endpoint, network) + self.working_directory = working_directory + self.channels = {} + self.my_channel_name = self.my_peer.mid + self.tribler_session = tribler_session + + # Internals, do not touch! + self._my_channel_info_hash = None + self._my_channel_torrent = None + self.decode_map.update({chr(1): self.on_channel}) + + def load_channels(self): + """ + Load all known Channels from the working directory. + + :returns: None + """ + channel_directories = [folder for folder in os.listdir(self.working_directory) if os.path.isdir(folder)] + for folder in channel_directories: + self.load_channel(folder) + if not self.my_channel_name in self.channels: + os.makedirs(os.path.abspath(os.path.join(self.working_directory, self.my_channel_name))) + + def load_channel(self, channel): + """ + Load a single channel from the folder structure. + + :param channel: the channel name + :returns: None + """ + real_path = os.path.abspath(os.path.join(self.working_directory, channel)) + if os.path.isdir(real_path): + channel_instance = Channel(channel, self.working_directory, allow_edit=(channel==self.my_channel_name)) + channel_instance.load() + self.channels[channel] = channel_instance + + def _commit_my_channel(self): + """ + Commit the channel based on my_peer. + + :returns: None + """ + my_channel = self.channels.get(self.my_channel_name, + Channel(self.my_channel_name, self.working_directory, True)) + my_channel.commit() + self._my_channel_torrent, self._my_channel_info_hash = my_channel.make_torrent() + self.channels[self.my_channel_name] = my_channel + + def _dirty_cache(self): + """ + Turn my channel dirty. + This invalidates the info hash and torrent file, which will have to be recreated. + + :returns: None + """ + self._my_channel_info_hash = None + self._my_channel_torrent = None + + @property + def my_channel_info_hash(self): + """ + The info hash representing my channel. + + :return: (20 byte str) info hash of my channel + """ + if not self._my_channel_info_hash: + self._commit_my_channel() + return self._my_channel_info_hash + + @property + def my_channel_magnet_link(self): + """ + The magnet link representing my channel. + + :return: the (stripped) magnet link of my channel + """ + if not self._my_channel_info_hash: + self._commit_my_channel() + return infohash_to_magnet(self._my_channel_info_hash) + + @property + def my_channel_torrent(self): + """ + The torrent file representing my channel. + + :return: the filename of the torrent for my channel + """ + if not self._my_channel_torrent: + self._commit_my_channel() + return self._my_channel_torrent + + def add_magnetlink(self, magnetlink): + """ + Add a magnet link to my channel. + + :param magnetlink: the (20 byte str) magnet link to add + :returns: None + """ + if not self.my_channel_name in self.channels: + self._commit_my_channel() + self.channels[self.my_channel_name].add_magnetlink(magnetlink) + self._dirty_cache() + + def remove_magnetlink(self, magnetlink): + """ + Remove a magnet link from my channel. + + :param magnetlink: the (20 byte str) magnet link to add + :returns: None + """ + if not self.my_channel_name in self.channels: + self._commit_my_channel() + self.channels[self.my_channel_name].remove_magnetlink(magnetlink) + self._dirty_cache() + + def get_channels(self): + """ + Get all known channels. + + :return: the names of the channels we know about (including our own) + """ + return self.channels.keys() + + def get_magnetlinks(self, channel): + """ + Get all the magnet links from a specific channel name. + + :param channel: the channel name + :return: the magnet links belonging to that channel + """ + channel_instance = self.channels.get(channel, None) + return channel_instance.get_magnetlinks() if channel_instance else [] + + def create_channel_message(self): + """ + Create a channel message for my channel. + + :return: the channel message + """ + global_time = self.claim_global_time() + payload = ChannelPayload(self.my_channel_info_hash).to_pack_list() + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + dist = GlobalTimeDistributionPayload(global_time).to_pack_list() + + return self._ez_pack(self._prefix, 1, [auth, dist, payload]) + + def download_finished(self, download): + """ + Callback for when a Channel download finished. + Load in the Channel data. + + :param download: the LibtorrentDownloadImpl instance + :returns: None + """ + real_path = download.dlconfig.get_dest_dir() + rel_path = os.path.relpath(self.working_directory, real_path) + self.load_channel(rel_path) + + def on_channel(self, source_address, data): + """ + Callback for when a ChannelPayload message is received. + """ + auth, _, payload = self._ez_unpack_auth(ChannelPayload, data) + channel = Peer(auth.public_key_bin).mid + # If we don't know about this channel, respond with our own + if channel not in self.channels: + packet = self.create_channel_message() + self.endpoint.send(source_address, packet) + # And start downloading it, if we are hooked up to a Tribler session + if self.tribler_session: + download_config = DefaultDownloadStartupConfig() + dest_dir = os.path.abspath(os.path.join(self.working_directory, channel)) + download_config.set_dest_dir(dest_dir) + add_deferred = self.tribler_session.start_download_from_uri(infohash_to_magnet(payload.info_hash), + download_config) + add_deferred.addCallback(lambda download: + download.deferred_finished).addCallback(self.download_finished) + + def on_introduction_response(self, source_address, data): + """ + Callback for when an introduction response is received. + + We extend the functionality by sharing our channel with the other side. + """ + super(AllChannel2Community, self).on_introduction_response(source_address, data) + + packet = self.create_channel_message() + self.endpoint.send(source_address, packet) diff --git a/Tribler/community/allchannel2/payload.py b/Tribler/community/allchannel2/payload.py new file mode 100644 index 00000000000..05dd8155ba1 --- /dev/null +++ b/Tribler/community/allchannel2/payload.py @@ -0,0 +1,16 @@ +from Tribler.pyipv8.ipv8.deprecated.payload import Payload + + +class ChannelPayload(Payload): + + format_list = ['20s'] + + def __init__(self, info_hash): + self.info_hash = info_hash + + def to_pack_list(self): + return [('20s', self.info_hash)] + + @classmethod + def from_unpack_list(cls, info_hash): + return cls(info_hash)