diff --git a/Tribler/community/tunnel/conversion.py b/Tribler/community/tunnel/conversion.py index 8537ce5ed34..34bb2674e94 100644 --- a/Tribler/community/tunnel/conversion.py +++ b/Tribler/community/tunnel/conversion.py @@ -341,7 +341,8 @@ def _decode_keys_request(self, placeholder, offset, data): return offset, placeholder.meta.payload.implement(identifier, info_hash) def _encode_keys_response(self, message): - return pack('!HH', message.payload.identifier, len(message.payload.public_key)) + message.payload.public_key, + return pack('!HH', message.payload.identifier, len(message.payload.public_key)) \ + + message.payload.public_key + message.payload.pex_peers, def _decode_keys_response(self, placeholder, offset, data): identifier, len_public_key = unpack_from('!HH', data, offset) @@ -350,7 +351,10 @@ def _decode_keys_response(self, placeholder, offset, data): public_key = data[offset: offset + len_public_key] offset += len_public_key - return offset, placeholder.meta.payload.implement(identifier, public_key) + pex_peers = data[offset:] + offset += len(pex_peers) + + return offset, placeholder.meta.payload.implement(identifier, public_key, pex_peers) def _encode_create_e2e(self, message): payload = message.payload diff --git a/Tribler/community/tunnel/hidden_community.py b/Tribler/community/tunnel/hidden_community.py index 389b747c816..128055411f6 100644 --- a/Tribler/community/tunnel/hidden_community.py +++ b/Tribler/community/tunnel/hidden_community.py @@ -8,7 +8,7 @@ from collections import defaultdict -from Tribler.Core.simpledefs import DLSTATUS_DOWNLOADING, DLSTATUS_SEEDING, DLSTATUS_STOPPED,\ +from Tribler.Core.simpledefs import DLSTATUS_SEEDING, DLSTATUS_STOPPED,\ NTFY_TUNNEL, NTFY_IP_REMOVED, NTFY_RP_REMOVED, NTFY_IP_RECREATE,\ NTFY_DHT_LOOKUP, NTFY_KEY_REQUEST, NTFY_KEY_RESPOND, NTFY_KEY_RESPONSE,\ NTFY_CREATE_E2E, NTFY_ONCREATED_E2E, NTFY_IP_CREATED @@ -61,7 +61,7 @@ def __init__(self, community, rp): def on_timeout(self): self.tunnel_logger.info("RPRequestCache: no response on establish-rendezvous (circuit %d)", - self.rp.circuit.circuit_id) + self.rp.circuit.circuit_id) self.community.remove_circuit(self.rp.circuit.circuit_id, 'establish-rendezvous timeout') @@ -73,9 +73,18 @@ def __init__(self, community, circuit, sock_addr, info_hash): self.circuit = circuit self.sock_addr = sock_addr self.info_hash = info_hash + self.community = community def on_timeout(self): - pass + self.tunnel_logger.info("KeyRequestCache: no response on key-request to %s", + self.sock_addr) + if self.info_hash in self.community.infohash_pex: + self.tunnel_logger.info("Remove peer %s from the peer exchange cache" % repr(self.sock_addr)) + peers = self.community.infohash_pex[self.info_hash] + for peer in peers.copy(): + peer_sock, _ = peer + if self.sock_addr == peer_sock: + self.community.infohash_pex[self.info_hash].remove(peer) class DHTRequestCache(RandomNumberCache): @@ -139,6 +148,7 @@ def __init__(self, *args, **kwargs): self.rendezvous_point_for = {} self.infohash_rp_circuits = defaultdict(list) self.infohash_ip_circuits = defaultdict(list) + self.infohash_pex = defaultdict(set) self.dht_blacklist = defaultdict(list) self.last_dht_lookup = {} @@ -241,14 +251,17 @@ def monitor_downloads(self, dslist): time_elapsed = (time.time() - self.last_dht_lookup.get(info_hash, 0)) force_dht_lookup = time_elapsed >= self.settings.dht_lookup_interval - if (state_changed or force_dht_lookup) and new_state == DLSTATUS_DOWNLOADING: - self.tunnel_logger.info('Do dht lookup for hidden services download %s' % info_hash.encode('hex')) + if (state_changed or force_dht_lookup): + self.tunnel_logger.info('Do dht lookup to find hidden services peers for %s' % info_hash.encode('hex')) self.do_dht_lookup(info_hash) - elif state_changed and new_state == DLSTATUS_SEEDING: + if state_changed and new_state == DLSTATUS_SEEDING: self.create_introduction_point(info_hash) elif state_changed and new_state in [DLSTATUS_STOPPED, None]: + if info_hash in self.infohash_pex: + self.infohash_pex.pop(info_hash) + for cid, info_hash_hops in self.my_download_points.items(): if info_hash_hops[0] == info_hash: self.remove_circuit(cid, 'download stopped', destroy=True) @@ -268,7 +281,7 @@ def do_dht_lookup(self, info_hash): self.tunnel_logger.info("Do DHT request: select circuit") circuit = self.selection_strategy.select(None, self.hops[info_hash]) if not circuit: - self.tunnel_logger.error("No circuit for dht-request") + self.tunnel_logger.info("No circuit for dht-request") return False # Send a dht-request message over this circuit @@ -299,7 +312,7 @@ def dht_callback(info_hash, peers, _): circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + dht_response_message.packet) else: self.tunnel_logger.info("Circuit %d is not existing anymore, can't send back dht-response" % - circuit_id) + circuit_id) self.tunnel_logger.info("Doing dht hidden seeders lookup for info_hash %s" % info_hash.encode('HEX')) self.dht_lookup(info_hash, dht_callback) @@ -320,6 +333,8 @@ def on_dht_response(self, messages): info_hash = message.payload.info_hash _, peers = decode(message.payload.peers) + peers = set(peers) + self.tunnel_logger.info("Received dht response containing %d peers" % len(peers)) blacklist = self.dht_blacklist[info_hash] @@ -333,7 +348,7 @@ def on_dht_response(self, messages): exclude = [rp[2] for rp in self.my_download_points.values()] + [sock_addr for _, sock_addr in blacklist] for peer in peers: if peer not in exclude: - self.tunnel_logger.info("Requesting key from %s", peer) + self.tunnel_logger.info("Requesting key from dht peer %s", peer) # Blacklist this sock_addr for a period of at least 60s self.dht_blacklist[info_hash].append((time.time(), peer)) self.create_key_request(info_hash, peer) @@ -392,8 +407,10 @@ def on_key_request(self, messages): self.notifier.notify(NTFY_TUNNEL, NTFY_KEY_RESPOND, info_hash.encode('hex')[:6], circuit.circuit_id) self.tunnel_logger.info("On key request: respond with keys to %s" % repr(message.candidate.sock_addr)) meta = self.get_meta_message(u'key-response') + pex_peers = self.infohash_pex.get(info_hash, set()) response = meta.impl(distribution=(self.global_time,), payload=( - message.payload.identifier, key.pub().key_to_bin())) + message.payload.identifier, key.pub().key_to_bin(), + encode(list(pex_peers)[:50]))) circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + response.packet) def check_key_response(self, messages): @@ -412,17 +429,35 @@ def on_key_response(self, messages): self.tunnel_logger.info('On key response: forward message because received over socket') meta = self.get_meta_message(u'key-response') relay_message = meta.impl(distribution=(self.global_time,), - payload=(cache.identifier, message.payload.public_key)) + payload=(cache.identifier, message.payload.public_key, + message.payload.pex_peers)) self.send_packet([Candidate(cache.return_sock_addr, False)], u"key-response", TUNNEL_PREFIX + relay_message.packet) else: + # pop key-request cache and notify gui self.tunnel_logger.info("On key response: received keys") cache = self.request_cache.pop(u"key-request", message.payload.identifier) + _, pex_peers = decode(message.payload.pex_peers) if self.notifier: self.notifier.notify(NTFY_TUNNEL, NTFY_KEY_RESPONSE, cache.info_hash.encode('hex')[:6], cache.circuit.circuit_id) - self.create_e2e(cache.circuit, cache.sock_addr, cache.info_hash, message.payload.public_key) + + # Cache this peer and key for pex via key-response + self.tunnel_logger.info("Added key to peer exchange cache") + self.infohash_pex[cache.info_hash].add((cache.sock_addr, message.payload.public_key)) + + # Add received pex_peers to own list of known peers for this infohash + for pex_peer in pex_peers: + pex_peer_sock, pex_peer_key = pex_peer + self.infohash_pex[cache.info_hash].add((pex_peer_sock, pex_peer_key)) + + # Initate end-to-end circuits for all known peers in the pex list + for peer in self.infohash_pex[cache.info_hash]: + peer_sock, peer_key = peer + if cache.info_hash not in self.infohash_ip_circuits: + self.tunnel_logger.info("Create end-to-end on pex_peer %s" % repr(peer_sock)) + self.create_e2e(cache.circuit, peer_sock, cache.info_hash, peer_key) def create_e2e(self, circuit, sock_addr, info_hash, public_key): hop = Hop(self.crypto.key_from_public_bin(public_key)) diff --git a/Tribler/community/tunnel/payload.py b/Tribler/community/tunnel/payload.py index ec0b9b43a67..54b2be5a5cc 100644 --- a/Tribler/community/tunnel/payload.py +++ b/Tribler/community/tunnel/payload.py @@ -405,13 +405,15 @@ class KeyResponsePayload(Payload): class Implementation(Payload.Implementation): - def __init__(self, meta, identifier, public_key): + def __init__(self, meta, identifier, public_key, pex_peers): assert isinstance(identifier, int), type(identifier) assert isinstance(public_key, basestring), type(public_key) + assert all(isinstance(pex_peer, basestring) for pex_peer in pex_peers) super(KeyResponsePayload.Implementation, self).__init__(meta) self._identifier = identifier self._public_key = public_key + self._pex_peers = pex_peers @property def identifier(self): @@ -421,6 +423,10 @@ def identifier(self): def public_key(self): return self._public_key + @property + def pex_peers(self): + return self._pex_peers + class CreateE2EPayload(Payload): diff --git a/Tribler/community/tunnel/tunnel_community.py b/Tribler/community/tunnel/tunnel_community.py index 47562a307bb..f921194de39 100644 --- a/Tribler/community/tunnel/tunnel_community.py +++ b/Tribler/community/tunnel/tunnel_community.py @@ -308,20 +308,20 @@ def become_exitnode(self): @classmethod def get_master_members(cls, dispersy): - # generated: Tue Jun 30 23:10:04 2015 - #curve: None - #len: 571 bits ~ 144 bytes signature - #pub: 170 3081a7301006072a8648ce3d020106052b8104002703819200040371e6e5ce138960fe5aa3b72f93684761865b9badf49214ecd1100d5bf3b938af388ec72a9e817741167773815cce75a928224820058e3709fbe6318b0b25decce90caf1f9aa640063f7e52549d4cfe4237184cac3821555131bef404396af51b83e2c119683d4569fb9e4e44daf416329edc425923fcdf2390b7459eebb89635f9f0e6ed76f61f753217b87ab7de23 - #prv: 241 3081ee020101044801c256b39ae23dc3c6ad56fb1f3fcc59176791ade7d23c456706e94c7c83b90f729120acdd24266d0aeb1feee6a4c95806b3385251c4daef2cd29e8b3a63d4cfae4ba719f0cfb211a00706052b81040027a1819503819200040371e6e5ce138960fe5aa3b72f93684761865b9badf49214ecd1100d5bf3b938af388ec72a9e817741167773815cce75a928224820058e3709fbe6318b0b25decce90caf1f9aa640063f7e52549d4cfe4237184cac3821555131bef404396af51b83e2c119683d4569fb9e4e44daf416329edc425923fcdf2390b7459eebb89635f9f0e6ed76f61f753217b87ab7de23 - #pub-sha1 9ff635a1cfc3d158d26443b2a297d5ae4af6ca1f - #prv-sha1 f7afffb514cdbae8cc4c441dd55e4a2d8d9f0894 - #-----BEGIN PUBLIC KEY----- - #MIGnMBAGByqGSM49AgEGBSuBBAAnA4GSAAQDceblzhOJYP5ao7cvk2hHYYZbm630 - #khTs0RANW/O5OK84jscqnoF3QRZ3c4FcznWpKCJIIAWONwn75jGLCyXezOkMrx+a - #pkAGP35SVJ1M/kI3GEysOCFVUTG+9AQ5avUbg+LBGWg9RWn7nk5E2vQWMp7cQlkj - #/N8jkLdFnuu4ljX58ObtdvYfdTIXuHq33iM= - #-----END PUBLIC KEY----- - master_key = "3081a7301006072a8648ce3d020106052b8104002703819200040371e6e5ce138960fe5aa3b72f93684761865b9badf49214ecd1100d5bf3b938af388ec72a9e817741167773815cce75a928224820058e3709fbe6318b0b25decce90caf1f9aa640063f7e52549d4cfe4237184cac3821555131bef404396af51b83e2c119683d4569fb9e4e44daf416329edc425923fcdf2390b7459eebb89635f9f0e6ed76f61f753217b87ab7de23".decode("HEX") + # generated: Sat Jul 4 15:31:17 2015 + # curve: None + # len: 571 bits ~ 144 bytes signature + # pub: 170 3081a7301006072a8648ce3d020106052b81040027038192000407b5eb7d25859eee6495ca0e1c14e0bb3c38cc4de5c1bb7d198a3cb6e075833ce59ac0464450e8405a6d9f7cce9824b970eb28259b8bd1f391d44c8dc7c8929014764cdbd5bd20300719a696f39dacd4e8c2dd8240e884a6eeff1734b9e175d28d805461f8a94f5fbc84b5ce1d0f322587522d1a79636301bb0efc1052239837beb88f3aa49b1b19b785296421ece659 + # prv: 241 3081ee0201010448023682a3d2a23725d6c8effc8bccc46ecdd5b613621f536d9b41b6f35a1a4bfca1d8175ba6a2171a05aa6d72b6c51499ea602ab33047e4e93715496aa3f2e3526fd48fb75fb32cb4a00706052b81040027a18195038192000407b5eb7d25859eee6495ca0e1c14e0bb3c38cc4de5c1bb7d198a3cb6e075833ce59ac0464450e8405a6d9f7cce9824b970eb28259b8bd1f391d44c8dc7c8929014764cdbd5bd20300719a696f39dacd4e8c2dd8240e884a6eeff1734b9e175d28d805461f8a94f5fbc84b5ce1d0f322587522d1a79636301bb0efc1052239837beb88f3aa49b1b19b785296421ece659 + # pub-sha1 2b62d6f75a72307897f512ecdd38ec5532c3ebc2 + # prv-sha1 eaf086a8eefac71151337b26f8b04fedaa6650d0 + # -----BEGIN PUBLIC KEY----- + # MIGnMBAGByqGSM49AgEGBSuBBAAnA4GSAAQHtet9JYWe7mSVyg4cFOC7PDjMTeXB + # u30Zijy24HWDPOWawEZEUOhAWm2ffM6YJLlw6yglm4vR85HUTI3HyJKQFHZM29W9 + # IDAHGaaW852s1OjC3YJA6ISm7v8XNLnhddKNgFRh+KlPX7yEtc4dDzIlh1ItGnlj + # YwG7DvwQUiOYN764jzqkmxsZt4UpZCHs5lk= + # -----END PUBLIC KEY----- + master_key = "3081a7301006072a8648ce3d020106052b81040027038192000407b5eb7d25859eee6495ca0e1c14e0bb3c38cc4de5c1bb7d198a3cb6e075833ce59ac0464450e8405a6d9f7cce9824b970eb28259b8bd1f391d44c8dc7c8929014764cdbd5bd20300719a696f39dacd4e8c2dd8240e884a6eeff1734b9e175d28d805461f8a94f5fbc84b5ce1d0f322587522d1a79636301bb0efc1052239837beb88f3aa49b1b19b785296421ece659".decode("HEX") master = dispersy.get_member(public_key=master_key) return [master]