Skip to content

Commit

Permalink
Track BEP33 DHT requests in order to match incoming bloomfilters to t…
Browse files Browse the repository at this point in the history
…he correct infohash
  • Loading branch information
egbertbouman committed Mar 31, 2021
1 parent e4a4bb6 commit 43c2599
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 26 deletions.
41 changes: 23 additions & 18 deletions src/tribler-core/tribler_core/modules/dht_health_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, lt_session):
self.lookup_futures = {} # Map from binary infohash to future
self.bf_seeders = {} # Map from infohash to (final) seeders bloomfilter
self.bf_peers = {} # Map from infohash to (final) peers bloomfilter
self.outstanding = {} # Map from transaction_id to infohash
self.lt_session = lt_session

def get_health(self, infohash, timeout=15):
Expand Down Expand Up @@ -51,6 +52,9 @@ def finalize_lookup(self, infohash):
Finalize the lookup of the provided infohash and invoke the appropriate deferred.
:param infohash: The infohash of the lookup we finialize.
"""
for transaction_id in [key for key, value in self.outstanding.items() if value == infohash]:
self.outstanding.pop(transaction_id, None)

if infohash not in self.lookup_futures:
return

Expand Down Expand Up @@ -113,29 +117,30 @@ def tobits(s):
c = min(m - 1, total_zeros)
return int(math.log(c / float(m)) / (2 * math.log(1 - 1 / float(m))))

def received_bloomfilters(self, node_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)):
def requesting_bloomfilters(self, transaction_id, infohash):
"""
Tne libtorrent DHT has sent a get_peers query for an infohash we may be interested in.
If so, keep track of the transaction and node IDs.
:param transaction_id: The ID of the query
:param infohash: The infohash for which the query was sent.
"""
if infohash in self.lookup_futures:
self.outstanding[transaction_id] = infohash
elif transaction_id in self.outstanding:
# Libtorrent is reusing the transaction_id, and is now using it for a infohash that we're not interested in.
self.outstanding.pop(transaction_id, None)

def received_bloomfilters(self, transaction_id, bf_seeds=bytearray(256), bf_peers=bytearray(256)):
"""
We have received bloom filters from the libtorrent DHT. Register the bloom filters and process them.
:param node_id: The ID of the node that sent the bloom filter.
:param transaction_id: The ID of the query for which we are receiving the bloom filter.
:param bf_seeds: The bloom filter indicating the IP addresses of the seeders.
:param bf_peers: The bloom filter indicating the IP addresses of the peers (leechers).
"""
min_distance = -1
closest_infohash = None

# We do not know to which infohash the received get_peers response belongs so we have to manually go through
# the infohashes and find the infohash that is the closest to the node id that sent us the message.
for infohash in self.lookup_futures:
ih_distance = distance(infohash, node_id)
if ih_distance < min_distance or min_distance == -1:
min_distance = ih_distance
closest_infohash = infohash

if not closest_infohash:
infohash = self.outstanding.get(transaction_id)
if not infohash:
self._logger.info("Could not find lookup infohash for incoming BEP33 bloomfilters")
return

self.bf_seeders[closest_infohash] = DHTHealthManager.combine_bloomfilters(
self.bf_seeders[closest_infohash], bf_seeds)
self.bf_peers[closest_infohash] = DHTHealthManager.combine_bloomfilters(
self.bf_peers[closest_infohash], bf_peers)
self.bf_seeders[infohash] = DHTHealthManager.combine_bloomfilters(self.bf_seeders[infohash], bf_seeds)
self.bf_peers[infohash] = DHTHealthManager.combine_bloomfilters(self.bf_peers[infohash], bf_peers)
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,24 @@ def process_alert(self, alert, hops=0):
self.session_stats_callback(alert)

elif alert_type == "dht_pkt_alert":
# We received a raw DHT message - decode it and check whether it is a BEP33 message.
# Unfortunately, the Python bindings don't have a direction attribute.
# So, we'll have to resort to using the string representation of the alert instead.
incoming = str(alert).startswith('<==')
decoded = bdecode_compat(alert.pkt_buf)
if decoded and b'r' in decoded:
if b'BFsd' in decoded[b'r'] and b'BFpe' in decoded[b'r']:
self.dht_health_manager.received_bloomfilters(decoded[b'r'][b'id'],
bytearray(decoded[b'r'][b'BFsd']),
bytearray(decoded[b'r'][b'BFpe']))
if not decoded:
return

# We are sending a raw DHT message - notify the DHTHealthManager of the outstanding request.
if not incoming and decoded.get(b'y') == b'q' \
and decoded.get(b'q') == b'get_peers' and decoded[b'a'].get(b'scrape') == 1:
self.dht_health_manager.requesting_bloomfilters(decoded[b't'],
decoded[b'a'][b'info_hash'])

# We received a raw DHT message - decode it and check whether it is a BEP33 message.
if incoming and b'r' in decoded and b'BFsd' in decoded[b'r'] and b'BFpe' in decoded[b'r']:
self.dht_health_manager.received_bloomfilters(decoded[b't'],
bytearray(decoded[b'r'][b'BFsd']),
bytearray(decoded[b'r'][b'BFpe']))

def update_ip_filter(self, lt_session, ip_addresses):
self._logger.debug('Updating IP filter %s', ip_addresses)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ async def test_receive_bloomfilters(dht_health_manager):
Test whether the right operations happen when receiving a bloom filter
"""
infohash = b'a' * 20
dht_health_manager.received_bloomfilters(infohash) # It should not do anything
transaction_id = '1'
dht_health_manager.received_bloomfilters(transaction_id) # It should not do anything
assert not dht_health_manager.bf_seeders
assert not dht_health_manager.bf_peers

dht_health_manager.lookup_futures[infohash] = Future()
dht_health_manager.bf_seeders[infohash] = bytearray(256)
dht_health_manager.bf_peers[infohash] = bytearray(256)
dht_health_manager.received_bloomfilters(b'b' * 20,
dht_health_manager.requesting_bloomfilters(transaction_id, infohash)
dht_health_manager.received_bloomfilters(transaction_id,
bf_seeds=bytearray(b'\xee' * 256),
bf_peers=bytearray(b'\xff' * 256))
assert dht_health_manager.bf_seeders[infohash] == bytearray(b'\xee' * 256)
Expand Down

0 comments on commit 43c2599

Please sign in to comment.