diff --git a/src/tribler-core/tribler_core/modules/dht_health_manager.py b/src/tribler-core/tribler_core/modules/dht_health_manager.py index 09085d763af..5dbbfbc53be 100644 --- a/src/tribler-core/tribler_core/modules/dht_health_manager.py +++ b/src/tribler-core/tribler_core/modules/dht_health_manager.py @@ -22,6 +22,7 @@ def __init__(self, lt_session): self.lookup_futures = {} # Map from binary infohash to future self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter self.bf_peers = {} # Map from infohash to (final) peers bloomfilter + self.outstanding = {} # Map from transaction_id to infohash self.lt_session = lt_session def get_health(self, infohash, timeout=15): @@ -51,6 +52,9 @@ def finalize_lookup(self, infohash): Finalize the lookup of the provided infohash and invoke the appropriate deferred. :param infohash: The infohash of the lookup we finialize. """ + for transaction_id in [key for key, value in self.outstanding.items() if value == infohash]: + self.outstanding.pop(transaction_id, None) + if infohash not in self.lookup_futures: return @@ -113,29 +117,30 @@ def tobits(s): c = min(m - 1, total_zeros) return int(math.log(c / float(m)) / (2 * math.log(1 - 1 / float(m)))) - def received_bloomfilters(self, node_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)): + def requesting_bloomfilters(self, transaction_id, infohash): + """ + Tne libtorrent DHT has sent a get_peers query for an infohash we may be interested in. + If so, keep track of the transaction and node IDs. + :param transaction_id: The ID of the query + :param infohash: The infohash for which the query was sent. + """ + if infohash in self.lookup_futures: + self.outstanding[transaction_id] = infohash + elif transaction_id in self.outstanding: + # Libtorrent is reusing the transaction_id, and is now using it for a infohash that we're not interested in. + self.outstanding.pop(transaction_id, None) + + def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)): """ We have received bloom filters from the libtorrent DHT. Register the bloom filters and process them. - :param node_id: The ID of the node that sent the bloom filter. + :param transaction_id: The ID of the query for which we are receiving the bloom filter. :param bf_seeds: The bloom filter indicating the IP addresses of the seeders. :param bf_peers: The bloom filter indicating the IP addresses of the peers (leechers). """ - min_distance = -1 - closest_infohash = None - - # We do not know to which infohash the received get_peers response belongs so we have to manually go through - # the infohashes and find the infohash that is the closest to the node id that sent us the message. - for infohash in self.lookup_futures: - ih_distance = distance(infohash, node_id) - if ih_distance < min_distance or min_distance == -1: - min_distance = ih_distance - closest_infohash = infohash - - if not closest_infohash: + infohash = self.outstanding.get(transaction_id) + if not infohash: self._logger.info("Could not find lookup infohash for incoming BEP33 bloomfilters") return - self.bf_seeders[closest_infohash] = DHTHealthManager.combine_bloomfilters( - self.bf_seeders[closest_infohash], bf_seeds) - self.bf_peers[closest_infohash] = DHTHealthManager.combine_bloomfilters( - self.bf_peers[closest_infohash], bf_peers) + self.bf_seeders[infohash] = DHTHealthManager.combine_bloomfilters(self.bf_seeders[infohash], bf_seeds) + self.bf_peers[infohash] = DHTHealthManager.combine_bloomfilters(self.bf_peers[infohash], bf_peers) diff --git a/src/tribler-core/tribler_core/modules/libtorrent/download_manager.py b/src/tribler-core/tribler_core/modules/libtorrent/download_manager.py index 7a1fb7979b5..9a380a018b8 100644 --- a/src/tribler-core/tribler_core/modules/libtorrent/download_manager.py +++ b/src/tribler-core/tribler_core/modules/libtorrent/download_manager.py @@ -383,13 +383,24 @@ def process_alert(self, alert, hops=0): self.session_stats_callback(alert) elif alert_type == "dht_pkt_alert": - # We received a raw DHT message - decode it and check whether it is a BEP33 message. + # Unfortunately, the Python bindings don't have a direction attribute. + # So, we'll have to resort to using the string representation of the alert instead. + incoming = str(alert).startswith('<==') decoded = bdecode_compat(alert.pkt_buf) - if decoded and b'r' in decoded: - if b'BFsd' in decoded[b'r'] and b'BFpe' in decoded[b'r']: - self.dht_health_manager.received_bloomfilters(decoded[b'r'][b'id'], - bytearray(decoded[b'r'][b'BFsd']), - bytearray(decoded[b'r'][b'BFpe'])) + if not decoded: + return + + # We are sending a raw DHT message - notify the DHTHealthManager of the outstanding request. + if not incoming and decoded.get(b'y') == b'q' \ + and decoded.get(b'q') == b'get_peers' and decoded[b'a'].get(b'scrape') == 1: + self.dht_health_manager.requesting_bloomfilters(decoded[b't'], + decoded[b'a'][b'info_hash']) + + # We received a raw DHT message - decode it and check whether it is a BEP33 message. + if incoming and b'r' in decoded and b'BFsd' in decoded[b'r'] and b'BFpe' in decoded[b'r']: + self.dht_health_manager.received_bloomfilters(decoded[b't'], + bytearray(decoded[b'r'][b'BFsd']), + bytearray(decoded[b'r'][b'BFpe'])) def update_ip_filter(self, lt_session, ip_addresses): self._logger.debug('Updating IP filter %s', ip_addresses) diff --git a/src/tribler-core/tribler_core/modules/tests/test_dht_health_manager.py b/src/tribler-core/tribler_core/modules/tests/test_dht_health_manager.py index d6e923d3e0d..ce98a3453d8 100644 --- a/src/tribler-core/tribler_core/modules/tests/test_dht_health_manager.py +++ b/src/tribler-core/tribler_core/modules/tests/test_dht_health_manager.py @@ -74,14 +74,16 @@ async def test_receive_bloomfilters(dht_health_manager): Test whether the right operations happen when receiving a bloom filter """ infohash = b'a' * 20 - dht_health_manager.received_bloomfilters(infohash) # It should not do anything + transaction_id = '1' + dht_health_manager.received_bloomfilters(transaction_id) # It should not do anything assert not dht_health_manager.bf_seeders assert not dht_health_manager.bf_peers dht_health_manager.lookup_futures[infohash] = Future() dht_health_manager.bf_seeders[infohash] = bytearray(256) dht_health_manager.bf_peers[infohash] = bytearray(256) - dht_health_manager.received_bloomfilters(b'b' * 20, + dht_health_manager.requesting_bloomfilters(transaction_id, infohash) + dht_health_manager.received_bloomfilters(transaction_id, bf_seeds=bytearray(b'\xee' * 256), bf_peers=bytearray(b'\xff' * 256)) assert dht_health_manager.bf_seeders[infohash] == bytearray(b'\xee' * 256)