Skip to content

Commit

Permalink
Added torrent info request/response
Browse files Browse the repository at this point in the history
  • Loading branch information
xoriole committed Jun 11, 2018
1 parent 97f3ba7 commit 75c03cb
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 19 deletions.
71 changes: 67 additions & 4 deletions Tribler/community/popular/community.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import LoopingCall

from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription
from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \
TorrentInfoResponsePayload
from Tribler.community.popular.repository import ContentRepository, TYPE_TORRENT_HEALTH
from Tribler.pyipv8.ipv8.deprecated.community import Community
from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload
Expand All @@ -12,7 +13,8 @@
MSG_POPULAR_CONTENT_SUBSCRIPTION = 2
MSG_TORRENT_HEALTH_RESPONSE = 3
MSG_CHANNEL_HEALTH_RESPONSE = 4
MSG_TORRENT_HEALTH_UPDATE = 5
MSG_TORRENT_INFO_REQUEST = 5
MSG_TORRENT_INFO_RESPONSE = 6

MAX_SUBSCRIBERS = 10
MAX_PUBLISHERS = 10
Expand Down Expand Up @@ -43,7 +45,9 @@ def __init__(self, *args, **kwargs):
chr(MSG_POPULAR_CONTENT_SUBSCRIBE): self.on_popular_content_subscribe,
chr(MSG_POPULAR_CONTENT_SUBSCRIPTION): self.on_popular_content_subscription,
chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response,
chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response
chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response,
chr(MSG_TORRENT_INFO_REQUEST): self.on_torrent_info_request,
chr(MSG_TORRENT_INFO_RESPONSE): self.on_torrent_info_response
})

# A set of publisher and subscriber.
Expand Down Expand Up @@ -181,14 +185,46 @@ def on_torrent_health_response(self, source_address, data):
self._logger.error(ERROR_UNKNOWN_RESPONSE)
return

infohash = payload.infohash
if not self.content_repository.has_torrent(infohash):
self.send_torrent_info_request(infohash, peer=peer)

peer_trust = self.trustchain.get_trust(peer) if self.trustchain else 0
self.content_repository.update_torrent(payload, peer_trust)
self.content_repository.update_torrent_health(payload, peer_trust)

def on_channel_health_response(self, source_address, data):
"""
Message handler for channel health response. Currently, not sure how to handle it.
"""

def on_torrent_info_request(self, source_address, data):
"""
Message handler for torrent info request.
"""
self._logger.info("Got torrent info request from %s", source_address)
auth, _, payload = self._ez_unpack_auth(TorrentInfoRequestPayload, data)
peer = self.get_peer_from_auth(auth, source_address)

if peer not in self.publishers:
self._logger.error(ERROR_UNKNOWN_RESPONSE)
return

self.send_torrent_info_response(payload.infohash, peer=peer)

def on_torrent_info_response(self, source_address, data):
"""
Message handler for torrent info response.
"""
self._logger.info("Got torrent info response from %s", source_address)
auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data)
peer = self.get_peer_from_auth(auth, source_address)

if peer not in self.publishers:
self._logger.error(ERROR_UNKNOWN_RESPONSE)
return

self.content_repository.update_torrent_info(payload)

# MESSAGE SENDING FUNCTIONS

def send_popular_content_subscribe(self, peer, subscribe=True):
Expand Down Expand Up @@ -248,6 +284,33 @@ def send_channel_health_response(self, payload, peer=None):
packet = self.create_message_packet(MSG_CHANNEL_HEALTH_RESPONSE, payload)
self.broadcast_message(packet, peer=peer)

def send_torrent_info_request(self, infohash, peer):
"""
Method to request information about a torrent with given infohash to a peer.
"""
if peer not in self.get_peers():
self._logger.error(ERROR_UNKNOWN_PEER)
return

info_request = TorrentInfoRequestPayload(infohash)
packet = self.create_message_packet(MSG_TORRENT_INFO_REQUEST, info_request)
self.broadcast_message(packet, peer=peer)

def send_torrent_info_response(self, infohash, peer):
"""
Method to send information about a torrent with given infohash to the requesting peer.
"""
if peer not in self.get_peers():
self._logger.error(ERROR_UNKNOWN_PEER)
return

db_torrent = self.content_repository.get_torrent(infohash)
info_response = TorrentInfoResponsePayload(infohash, db_torrent['name'], db_torrent['length'],
db_torrent['creation_date'], db_torrent['num_files'],
db_torrent['comment'])
packet = self.create_message_packet(MSG_TORRENT_INFO_RESPONSE, info_response)
self.broadcast_message(packet, peer=peer)

# HELPER METHODS

def create_message_packet(self, message_type, payload):
Expand Down
62 changes: 60 additions & 2 deletions Tribler/community/popular/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def from_unpack_list(cls, *args):

class TorrentHealthPayload(Payload):

format_list = ['varlenI', 'I', 'I', 'f']
format_list = ['20s', 'I', 'I', 'f']

def __init__(self, infohash, num_seeders, num_leechers, timestamp):
super(TorrentHealthPayload, self).__init__()
Expand All @@ -32,7 +32,7 @@ def __init__(self, infohash, num_seeders, num_leechers, timestamp):
self._timestamp = timestamp

def to_pack_list(self):
data = [('varlenI', str(self.infohash)),
data = [('20s', str(self.infohash)),
('I', self.num_seeders),
('I', self.num_leechers),
('f', self.timestamp)]
Expand Down Expand Up @@ -89,3 +89,61 @@ def to_pack_list(self):
def from_unpack_list(cls, *args):
(channel_id, num_votes, num_torrents, swarm_size_sum, timestamp) = args
return ChannelHealthPayload(channel_id, num_votes, num_torrents, swarm_size_sum, timestamp)


class TorrentInfoRequestPayload(Payload):
"""
Payload for requesting torrent info for a given infohash.
"""
format_list = ['20s']

def __init__(self, infohash):
super(TorrentInfoRequestPayload, self).__init__()
self._infohash = infohash

def to_pack_list(self):
data = [('20s', str(self.infohash))]
return data

@classmethod
def from_unpack_list(cls, *args):
(infohash) = args
return TorrentInfoRequestPayload(infohash)

@property
def infohash(self):
return self._infohash


class TorrentInfoResponsePayload(Payload):
"""
Payload for torrent info response.
"""
format_list = ['20s', 'varlenH', 'I', 'I', 'I', 'varlenH']

def __init__(self, infohash, name, length, creation_date, num_files, comment):
super(TorrentInfoResponsePayload, self).__init__()
self._infohash = infohash
self._name = name
self._length = length
self._creation_date = creation_date
self._num_files = num_files
self._comment = comment

def to_pack_list(self):
data = [('20s', str(self.infohash)),
('varlenH', str(self._name)),
('I', self._length),
('I', self._creation_date),
('I', self._num_files),
('varlenH', self._comment)]
return data

@classmethod
def from_unpack_list(cls, *args):
(infohash, name, length, creation_date, num_files, comment) = args
return TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment)

@property
def infohash(self):
return self._infohash
52 changes: 39 additions & 13 deletions Tribler/community/popular/repository.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import logging
import time
from binascii import unhexlify

from Tribler.Core.CacheDB.sqlitecachedb import str2bin, bin2str
from collections import deque

from Tribler.community.popular.payload import TorrentHealthPayload
Expand Down Expand Up @@ -38,25 +41,48 @@ def pop_content(self):
def get_top_torrents(self, limit=DEFAULT_TORRENT_LIMIT):
return self.torrent_db.getRecentlyCheckedTorrents(limit)

def update_torrent(self, torrent_health_payload, peer_trust=0):
def update_torrent_health(self, torrent_health_payload, peer_trust=0):
assert torrent_health_payload and isinstance(torrent_health_payload, TorrentHealthPayload)

if not self.torrent_db:
self._logger.error("Torrent DB is None")
return
assert torrent_health_payload and isinstance(torrent_health_payload, TorrentHealthPayload)

infohash = torrent_health_payload.infohash

if self.torrent_db.hasTorrent(infohash):
if self.has_torrent(infohash):
db_torrent = self.get_torrent(infohash)
is_stale = time.time() - db_torrent['last_tracker_check'] > DEFAULT_FRESHNESS_LIMIT
if is_stale or peer_trust > 1:
self.torrent_db.updateTorrent(self, infohash, notify=False,
seeder=torrent_health_payload.num_seeders,
leecher=torrent_health_payload.num_leechers,
last_torrent_check=int(torrent_health_payload.timestamp))
else:
self._logger.error("Torrent[%s] is unknown", infohash.encode('hex'))
is_fresh = time.time() - db_torrent['last_tracker_check'] < DEFAULT_FRESHNESS_LIMIT
if is_fresh or peer_trust < 2:
self._logger.info("Database record is either fresh or peer trust score is too low. Ignoring response.")
return

# Update the torrent health anyway. A torrent info request should be sent separately to request additional info.
self.update_torrent_in_db(self, infohash, seeder=torrent_health_payload.num_seeders,
leecher=torrent_health_payload.num_leechers,
last_torrent_check=int(torrent_health_payload.timestamp))

def update_torrent_info(self, torrent_info_response):
infohash = torrent_info_response.infohash
if self.has_torrent(infohash):
db_torrent = self.get_torrent(infohash)
if db_torrent['name'] and db_torrent['name'] == torrent_info_response.name:
self._logger.info("Conflicting names for torrent. Ignoring the response")
return

# Update local database
self.update_torrent_in_db(self, infohash, name=torrent_info_response.name,
length=torrent_info_response.length,
creation_date=torrent_info_response.creation_date,
num_files=torrent_info_response.num_files,
comment=torrent_info_response.comment)

def get_torrent(self, infohash):
keys = ('num_seeders', 'num_leechers', 'last_tracker_check')
keys = ('name', 'length', 'creation_date', 'num_files', 'num_seeders', 'num_leechers', 'comment',
'last_tracker_check')
return self.torrent_db.getTorrent(infohash, keys=keys, include_mypref=False)

def has_torrent(self, infohash):
return self.torrent_db.hasTorrent(infohash)

def update_torrent_in_db(self, infohash, **kw):
self.torrent_db.updateTorrent(infohash, notify=False, **kw)

0 comments on commit 75c03cb

Please sign in to comment.