Skip to content

Commit

Permalink
litep2p/discovery: Fix memory leak in litep2p.public_addresses() (#…
Browse files Browse the repository at this point in the history
…5998)

This PR ensures that the `litep2p.public_addresses()` never grows
indefinitely.
- effectively fixes subtle memory leaks
- fixes authority DHT records being dropped due to size limits being
exceeded
- provides a healthier subset of public addresses to the `/identify`
protocol

This PR adds a new `ExternalAddressExpired` event to the
litep2p/discovery process.

Substrate uses an LRU `address_confirmations` bounded to 32 address
entries.
The oldest entry is propagated via the `ExternalAddressExpired` event
when a new address is added to the list (if capacity is exceeded).

The expired address is then removed from the
`litep2p.public_addresses()`, effectively limiting its size to 32
entries (the size of `address_confirmations` LRU).

cc @paritytech/networking @alexggh

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
Co-authored-by: Dmitry Markin <[email protected]>
  • Loading branch information
3 people committed Nov 15, 2024
1 parent 141eb46 commit e72e7f4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
15 changes: 15 additions & 0 deletions prdoc/pr_5998.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Fix memory leak in litep2p public addresses

doc:
- audience: [ Node Dev, Node Operator ]
description: |
This PR bounds the number of public addresses of litep2p to 32 entries.
This ensures we do not increase the number of addresses over time, and that the DHT
authority records will not exceed the upper size limit.

crates:
- name: sc-network
bump: patch
57 changes: 51 additions & 6 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,16 @@ pub enum DiscoveryEvent {

/// New external address discovered.
ExternalAddressDiscovered {
/// Discovered addresses.
/// Discovered address.
address: Multiaddr,
},

/// The external address has expired.
///
/// This happens when the internal buffers exceed the maximum number of external addresses,
/// and this address is the oldest one.
ExternalAddressExpired {
/// Expired address.
address: Multiaddr,
},

Expand Down Expand Up @@ -432,7 +441,13 @@ impl Discovery {
}

/// Check if `address` can be considered a new external address.
fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
///
/// If this address replaces an older address, the expired address is returned.
fn is_new_external_address(
&mut self,
address: &Multiaddr,
peer: PeerId,
) -> (bool, Option<Multiaddr>) {
log::trace!(target: LOG_TARGET, "verify new external address: {address}");

// is the address one of our known addresses
Expand All @@ -443,23 +458,39 @@ impl Discovery {
.chain(self.public_addresses.iter())
.any(|known_address| Discovery::is_known_address(&known_address, &address))
{
return true
return (true, None)
}

match self.address_confirmations.get(address) {
Some(confirmations) => {
confirmations.insert(peer);

if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return true
return (true, None)
}
},
None => {
let oldest = (self.address_confirmations.len() >=
self.address_confirmations.limiter().max_length() as usize)
.then(|| {
self.address_confirmations.pop_oldest().map(|(address, peers)| {
if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
return Some(address)
} else {
None
}
})
})
.flatten()
.flatten();

self.address_confirmations.insert(address.clone(), Default::default());

return (false, oldest)
},
}

false
(false, None)
}
}

Expand Down Expand Up @@ -567,7 +598,21 @@ impl Stream for Discovery {
supported_protocols,
observed_address,
})) => {
if this.is_new_external_address(&observed_address, peer) {
let (is_new, expired_address) =
this.is_new_external_address(&observed_address, peer);

if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);

this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
address: expired_address,
});
}

if is_new {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
Expand Down
19 changes: 19 additions & 0 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,25 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
},
}
}
Some(DiscoveryEvent::ExternalAddressExpired{ address }) => {
let local_peer_id = self.litep2p.local_peer_id();

// Litep2p requires the peer ID to be present in the address.
let address = if !std::matches!(address.iter().last(), Some(Protocol::P2p(_))) {
address.with(Protocol::P2p(*local_peer_id.as_ref()))
} else {
address
};

if self.litep2p.public_addresses().remove_address(&address) {
log::info!(target: LOG_TARGET, "🔍 Expired external address for our node: {address}");
} else {
log::warn!(
target: LOG_TARGET,
"🔍 Failed to remove expired external address {address:?}"
);
}
}
Some(DiscoveryEvent::Ping { peer, rtt }) => {
log::trace!(
target: LOG_TARGET,
Expand Down

0 comments on commit e72e7f4

Please sign in to comment.