-
Notifications
You must be signed in to change notification settings - Fork 452
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Peer exchange via key-response of seeder
- Loading branch information
Showing
4 changed files
with
74 additions
and
29 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,7 @@ | |
|
||
from collections import defaultdict | ||
|
||
from Tribler.Core.simpledefs import DLSTATUS_DOWNLOADING, DLSTATUS_SEEDING, DLSTATUS_STOPPED,\ | ||
from Tribler.Core.simpledefs import DLSTATUS_SEEDING, DLSTATUS_STOPPED,\ | ||
NTFY_TUNNEL, NTFY_IP_REMOVED, NTFY_RP_REMOVED, NTFY_IP_RECREATE,\ | ||
NTFY_DHT_LOOKUP, NTFY_KEY_REQUEST, NTFY_KEY_RESPOND, NTFY_KEY_RESPONSE,\ | ||
NTFY_CREATE_E2E, NTFY_ONCREATED_E2E, NTFY_IP_CREATED | ||
|
@@ -61,7 +61,7 @@ def __init__(self, community, rp): | |
|
||
def on_timeout(self): | ||
self.tunnel_logger.info("RPRequestCache: no response on establish-rendezvous (circuit %d)", | ||
self.rp.circuit.circuit_id) | ||
self.rp.circuit.circuit_id) | ||
self.community.remove_circuit(self.rp.circuit.circuit_id, 'establish-rendezvous timeout') | ||
|
||
|
||
|
@@ -73,9 +73,18 @@ def __init__(self, community, circuit, sock_addr, info_hash): | |
self.circuit = circuit | ||
self.sock_addr = sock_addr | ||
self.info_hash = info_hash | ||
self.community = community | ||
|
||
def on_timeout(self): | ||
pass | ||
self.tunnel_logger.info("KeyRequestCache: no response on key-request to %s", | ||
self.sock_addr) | ||
if self.info_hash in self.community.infohash_pex: | ||
self.tunnel_logger.info("Remove peer %s from the peer exchange cache" % repr(self.sock_addr)) | ||
peers = self.community.infohash_pex[self.info_hash] | ||
for peer in peers.copy(): | ||
peer_sock, _ = peer | ||
if self.sock_addr == peer_sock: | ||
self.community.infohash_pex[self.info_hash].remove(peer) | ||
|
||
|
||
class DHTRequestCache(RandomNumberCache): | ||
|
@@ -139,6 +148,7 @@ def __init__(self, *args, **kwargs): | |
self.rendezvous_point_for = {} | ||
self.infohash_rp_circuits = defaultdict(list) | ||
self.infohash_ip_circuits = defaultdict(list) | ||
self.infohash_pex = defaultdict(set) | ||
|
||
self.dht_blacklist = defaultdict(list) | ||
self.last_dht_lookup = {} | ||
|
@@ -241,14 +251,17 @@ def monitor_downloads(self, dslist): | |
|
||
time_elapsed = (time.time() - self.last_dht_lookup.get(info_hash, 0)) | ||
force_dht_lookup = time_elapsed >= self.settings.dht_lookup_interval | ||
if (state_changed or force_dht_lookup) and new_state == DLSTATUS_DOWNLOADING: | ||
self.tunnel_logger.info('Do dht lookup for hidden services download %s' % info_hash.encode('hex')) | ||
if (state_changed or force_dht_lookup): | ||
self.tunnel_logger.info('Do dht lookup to find hidden services peers for %s' % info_hash.encode('hex')) | ||
self.do_dht_lookup(info_hash) | ||
|
||
elif state_changed and new_state == DLSTATUS_SEEDING: | ||
if state_changed and new_state == DLSTATUS_SEEDING: | ||
self.create_introduction_point(info_hash) | ||
|
||
elif state_changed and new_state in [DLSTATUS_STOPPED, None]: | ||
if info_hash in self.infohash_pex: | ||
self.infohash_pex.pop(info_hash) | ||
|
||
for cid, info_hash_hops in self.my_download_points.items(): | ||
if info_hash_hops[0] == info_hash: | ||
self.remove_circuit(cid, 'download stopped', destroy=True) | ||
|
@@ -268,7 +281,7 @@ def do_dht_lookup(self, info_hash): | |
self.tunnel_logger.info("Do DHT request: select circuit") | ||
circuit = self.selection_strategy.select(None, self.hops[info_hash]) | ||
if not circuit: | ||
self.tunnel_logger.error("No circuit for dht-request") | ||
self.tunnel_logger.info("No circuit for dht-request") | ||
return False | ||
|
||
# Send a dht-request message over this circuit | ||
|
@@ -299,7 +312,7 @@ def dht_callback(info_hash, peers, _): | |
circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + dht_response_message.packet) | ||
else: | ||
self.tunnel_logger.info("Circuit %d is not existing anymore, can't send back dht-response" % | ||
circuit_id) | ||
circuit_id) | ||
|
||
self.tunnel_logger.info("Doing dht hidden seeders lookup for info_hash %s" % info_hash.encode('HEX')) | ||
self.dht_lookup(info_hash, dht_callback) | ||
|
@@ -320,6 +333,8 @@ def on_dht_response(self, messages): | |
|
||
info_hash = message.payload.info_hash | ||
_, peers = decode(message.payload.peers) | ||
peers = set(peers) | ||
self.tunnel_logger.info("Received dht response containing %d peers" % len(peers)) | ||
|
||
blacklist = self.dht_blacklist[info_hash] | ||
|
||
|
@@ -333,7 +348,7 @@ def on_dht_response(self, messages): | |
exclude = [rp[2] for rp in self.my_download_points.values()] + [sock_addr for _, sock_addr in blacklist] | ||
for peer in peers: | ||
if peer not in exclude: | ||
self.tunnel_logger.info("Requesting key from %s", peer) | ||
self.tunnel_logger.info("Requesting key from dht peer %s", peer) | ||
# Blacklist this sock_addr for a period of at least 60s | ||
self.dht_blacklist[info_hash].append((time.time(), peer)) | ||
self.create_key_request(info_hash, peer) | ||
|
@@ -392,8 +407,10 @@ def on_key_request(self, messages): | |
self.notifier.notify(NTFY_TUNNEL, NTFY_KEY_RESPOND, info_hash.encode('hex')[:6], circuit.circuit_id) | ||
self.tunnel_logger.info("On key request: respond with keys to %s" % repr(message.candidate.sock_addr)) | ||
meta = self.get_meta_message(u'key-response') | ||
pex_peers = self.infohash_pex.get(info_hash, set()) | ||
response = meta.impl(distribution=(self.global_time,), payload=( | ||
message.payload.identifier, key.pub().key_to_bin())) | ||
message.payload.identifier, key.pub().key_to_bin(), | ||
encode(list(pex_peers)[:50]))) | ||
This comment has been minimized.
Sorry, something went wrong. |
||
circuit.tunnel_data(message.candidate.sock_addr, TUNNEL_PREFIX + response.packet) | ||
|
||
def check_key_response(self, messages): | ||
|
@@ -412,17 +429,35 @@ def on_key_response(self, messages): | |
self.tunnel_logger.info('On key response: forward message because received over socket') | ||
meta = self.get_meta_message(u'key-response') | ||
relay_message = meta.impl(distribution=(self.global_time,), | ||
payload=(cache.identifier, message.payload.public_key)) | ||
payload=(cache.identifier, message.payload.public_key, | ||
message.payload.pex_peers)) | ||
self.send_packet([Candidate(cache.return_sock_addr, False)], | ||
u"key-response", | ||
TUNNEL_PREFIX + relay_message.packet) | ||
else: | ||
# pop key-request cache and notify gui | ||
self.tunnel_logger.info("On key response: received keys") | ||
cache = self.request_cache.pop(u"key-request", message.payload.identifier) | ||
_, pex_peers = decode(message.payload.pex_peers) | ||
if self.notifier: | ||
self.notifier.notify(NTFY_TUNNEL, NTFY_KEY_RESPONSE, cache.info_hash.encode('hex')[:6], | ||
cache.circuit.circuit_id) | ||
self.create_e2e(cache.circuit, cache.sock_addr, cache.info_hash, message.payload.public_key) | ||
|
||
# Cache this peer and key for pex via key-response | ||
self.tunnel_logger.info("Added key to peer exchange cache") | ||
self.infohash_pex[cache.info_hash].add((cache.sock_addr, message.payload.public_key)) | ||
|
||
# Add received pex_peers to own list of known peers for this infohash | ||
for pex_peer in pex_peers: | ||
pex_peer_sock, pex_peer_key = pex_peer | ||
self.infohash_pex[cache.info_hash].add((pex_peer_sock, pex_peer_key)) | ||
|
||
# Initate end-to-end circuits for all known peers in the pex list | ||
for peer in self.infohash_pex[cache.info_hash]: | ||
peer_sock, peer_key = peer | ||
if cache.info_hash not in self.infohash_ip_circuits: | ||
self.tunnel_logger.info("Create end-to-end on pex_peer %s" % repr(peer_sock)) | ||
self.create_e2e(cache.circuit, peer_sock, cache.info_hash, peer_key) | ||
This comment has been minimized.
Sorry, something went wrong.
synctext
Member
|
||
|
||
def create_e2e(self, circuit, sock_addr, info_hash, public_key): | ||
hop = Hop(self.crypto.key_from_public_bin(public_key)) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
OK, just for the record..
PEX is now included in the key_response message.
Instead of a single hidden-seeder-intro-point, a list of 50 of them is returned.
This may be a whopping long list, as it included 50 x (pub_key + IP:Port).