Skip to content

Commit

Permalink
Added AllChannel 2.0 overlay class
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Mar 4, 2018
1 parent 4b6d889 commit 5ab0ac5
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Tribler/Core/Libtorrent/LibtorrentDownloadImpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(self, session, tdef):
self.deferreds_resume = []
self.deferreds_handle = []
self.deferred_removed = Deferred()
self.deferred_finished = Deferred()

self.handle_check_lc = self.register_task("handle_check", LoopingCall(self.check_handle))

Expand Down Expand Up @@ -687,6 +688,8 @@ def reset_priorities():
if self.endbuffsize:
self.set_byte_priority([(self.get_vod_fileindex(), 0, -1)], 1)
self.endbuffsize = 0
if not self.deferred_finished.called:
self.deferred_finished.callback(self)

def update_lt_stats(self):
""" Update libtorrent stats and check if the download should be stopped."""
Expand Down
233 changes: 233 additions & 0 deletions Tribler/community/allchannel2/community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
import os

from Tribler.community.allchannel2.payload import ChannelPayload
from Tribler.community.allchannel2.structures import Channel
from Tribler.Core.DownloadConfig import DefaultDownloadStartupConfig
from Tribler.pyipv8.ipv8.deprecated.community import Community
from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload
from Tribler.pyipv8.ipv8.peer import Peer


def infohash_to_magnet(infohash):
"""
Tranform an info hash to a magnet link.
:param infohash: the infohash to convert
:return: the magnet link belonging to this info hash
"""
return "magnet:?xt=urn:btih:" + infohash


class AllChannel2Community(Community):
"""
AllChannel 2.0
My channel administration usage:
1. __init__()
2. load_channels(): performs heavy disk I/O
3. add_magnetlink()/remove_magnetlink(): change the contents of my channel
4. fetch and seed my_channel_torrent (automatically committed)
"""
master_peer = Peer(("3081b00201010434005a4c1d05418be2acbb54abbcfee354c80d3c3a5c6c4ab176041b119de6af" +
"6c4f4113983cad1c3502d871c0db48de948425c0b5a00706052b81040024a16c036a000400c7d2" +
"044d0beb3d2bbbb78a596f15f4934b21d405cc8e0df16b2a1373165c3f6577d10ff0397e713a35" +
"ddb9ec8d177013cc9b9401f4464ca28137dedbd6967996c936a60281eab5298229af26dd47f3a0" +
"a9fb398dbeb8ba284861ad820d6366d931ea194f308d9f").decode('hex'))

def __init__(self, my_peer, endpoint, network, working_directory="./channels", tribler_session=None):
"""
Initialize the AllChannel2 Community.
:param my_peer: the Peer representing my peer
:param endpoint: the Endpoint object to use
:param network: the Network object to use
:param working_directory: the folder where all of the channels are stored
"""
super(AllChannel2Community, self).__init__(my_peer, endpoint, network)
self.working_directory = working_directory
self.channels = {}
self.my_channel_name = self.my_peer.mid
self.tribler_session = tribler_session

# Internals, do not touch!
self._my_channel_info_hash = None
self._my_channel_torrent = None
self.decode_map.update({chr(1): self.on_channel})

def load_channels(self):
"""
Load all known Channels from the working directory.
:returns: None
"""
channel_directories = [folder for folder in os.listdir(self.working_directory) if os.path.isdir(folder)]
for folder in channel_directories:
self.load_channel(folder)
if not self.my_channel_name in self.channels:
os.makedirs(os.path.abspath(os.path.join(self.working_directory, self.my_channel_name)))

def load_channel(self, channel):
"""
Load a single channel from the folder structure.
:param channel: the channel name
:returns: None
"""
real_path = os.path.abspath(os.path.join(self.working_directory, channel))
if os.path.isdir(real_path):
channel_instance = Channel(channel, self.working_directory, allow_edit=(channel==self.my_channel_name))
channel_instance.load()
self.channels[channel] = channel_instance

def _commit_my_channel(self):
"""
Commit the channel based on my_peer.
:returns: None
"""
my_channel = self.channels.get(self.my_channel_name,
Channel(self.my_channel_name, self.working_directory, True))
my_channel.commit()
self._my_channel_torrent, self._my_channel_info_hash = my_channel.make_torrent()
self.channels[self.my_channel_name] = my_channel

def _dirty_cache(self):
"""
Turn my channel dirty.
This invalidates the info hash and torrent file, which will have to be recreated.
:returns: None
"""
self._my_channel_info_hash = None
self._my_channel_torrent = None

@property
def my_channel_info_hash(self):
"""
The info hash representing my channel.
:return: (20 byte str) info hash of my channel
"""
if not self._my_channel_info_hash:
self._commit_my_channel()
return self._my_channel_info_hash

@property
def my_channel_magnet_link(self):
"""
The magnet link representing my channel.
:return: the (stripped) magnet link of my channel
"""
if not self._my_channel_info_hash:
self._commit_my_channel()
return infohash_to_magnet(self._my_channel_info_hash)

@property
def my_channel_torrent(self):
"""
The torrent file representing my channel.
:return: the filename of the torrent for my channel
"""
if not self._my_channel_torrent:
self._commit_my_channel()
return self._my_channel_torrent

def add_magnetlink(self, magnetlink):
"""
Add a magnet link to my channel.
:param magnetlink: the (20 byte str) magnet link to add
:returns: None
"""
if not self.my_channel_name in self.channels:
self._commit_my_channel()
self.channels[self.my_channel_name].add_magnetlink(magnetlink)
self._dirty_cache()

def remove_magnetlink(self, magnetlink):
"""
Remove a magnet link from my channel.
:param magnetlink: the (20 byte str) magnet link to add
:returns: None
"""
if not self.my_channel_name in self.channels:
self._commit_my_channel()
self.channels[self.my_channel_name].remove_magnetlink(magnetlink)
self._dirty_cache()

def get_channels(self):
"""
Get all known channels.
:return: the names of the channels we know about (including our own)
"""
return self.channels.keys()

def get_magnetlinks(self, channel):
"""
Get all the magnet links from a specific channel name.
:param channel: the channel name
:return: the magnet links belonging to that channel
"""
channel_instance = self.channels.get(channel, None)
return channel_instance.get_magnetlinks() if channel_instance else []

def create_channel_message(self):
"""
Create a channel message for my channel.
:return: the channel message
"""
global_time = self.claim_global_time()
payload = ChannelPayload(self.my_channel_info_hash).to_pack_list()
auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list()
dist = GlobalTimeDistributionPayload(global_time).to_pack_list()

return self._ez_pack(self._prefix, 1, [auth, dist, payload])

def download_finished(self, download):
"""
Callback for when a Channel download finished.
Load in the Channel data.
:param download: the LibtorrentDownloadImpl instance
:returns: None
"""
real_path = download.dlconfig.get_dest_dir()
rel_path = os.path.relpath(self.working_directory, real_path)
self.load_channel(rel_path)

def on_channel(self, source_address, data):
"""
Callback for when a ChannelPayload message is received.
"""
auth, _, payload = self._ez_unpack_auth(ChannelPayload, data)
channel = Peer(auth.public_key_bin).mid
# If we don't know about this channel, respond with our own
if channel not in self.channels:
packet = self.create_channel_message()
self.endpoint.send(source_address, packet)
# And start downloading it, if we are hooked up to a Tribler session
if self.tribler_session:
download_config = DefaultDownloadStartupConfig()
dest_dir = os.path.abspath(os.path.join(self.working_directory, channel))
download_config.set_dest_dir(dest_dir)
add_deferred = self.tribler_session.start_download_from_uri(infohash_to_magnet(payload.info_hash),
download_config)
add_deferred.addCallback(lambda download:
download.deferred_finished).addCallback(self.download_finished)

def on_introduction_response(self, source_address, data):
"""
Callback for when an introduction response is received.
We extend the functionality by sharing our channel with the other side.
"""
super(AllChannel2Community, self).on_introduction_response(source_address, data)

packet = self.create_channel_message()
self.endpoint.send(source_address, packet)
16 changes: 16 additions & 0 deletions Tribler/community/allchannel2/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from Tribler.pyipv8.ipv8.deprecated.payload import Payload


class ChannelPayload(Payload):

format_list = ['20s']

def __init__(self, info_hash):
self.info_hash = info_hash

def to_pack_list(self):
return [('20s', self.info_hash)]

@classmethod
def from_unpack_list(cls, info_hash):
return cls(info_hash)

0 comments on commit 5ab0ac5

Please sign in to comment.