Skip to content

Commit

Permalink
fix: attempt to fix issue paritytech#5998 paritytech#6149 on branch `…
Browse files Browse the repository at this point in the history
…stable2409` for a temporary apply to the downstream project.
  • Loading branch information
0xbillw committed Dec 23, 2024
1 parent c921152 commit 0605ce2
Show file tree
Hide file tree
Showing 18 changed files with 1,126 additions and 629 deletions.
631 changes: 363 additions & 268 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
litep2p = { version = "0.6.2" }
litep2p = { version = "0.8.4", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
Expand Down
12 changes: 4 additions & 8 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ pub enum BehaviourOut {
///
/// This event is generated for statistics purposes.
InboundRequest {
/// Peer which sent us a request.
peer: PeerId,
/// Protocol name of the request.
protocol: ProtocolName,
/// If `Ok`, contains the time elapsed between when we received the request and when we
Expand All @@ -89,8 +87,6 @@ pub enum BehaviourOut {
///
/// This event is generated for statistics purposes.
RequestFinished {
/// Peer that we send a request to.
peer: PeerId,
/// Name of the protocol in question.
protocol: ProtocolName,
/// Duration the request took.
Expand Down Expand Up @@ -350,10 +346,10 @@ impl From<CustomMessageOutcome> for BehaviourOut {
impl From<request_responses::Event> for BehaviourOut {
fn from(event: request_responses::Event) -> Self {
match event {
request_responses::Event::InboundRequest { peer, protocol, result } =>
BehaviourOut::InboundRequest { peer, protocol, result },
request_responses::Event::RequestFinished { peer, protocol, duration, result } =>
BehaviourOut::RequestFinished { peer, protocol, duration, result },
request_responses::Event::InboundRequest { protocol, result, .. } =>
BehaviourOut::InboundRequest { protocol, result },
request_responses::Event::RequestFinished { protocol, duration, result, .. } =>
BehaviourOut::RequestFinished { protocol, duration, result },
request_responses::Event::ReputationChanges { peer, changes } =>
BehaviourOut::ReputationChanges { peer, changes },
}
Expand Down
24 changes: 18 additions & 6 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let mut list: LinkedHashSet<_> = self
.permanent_addresses
.iter()
.filter_map(|(p, a)| (*p == peer_id).then_some(a.clone()))
.filter_map(|(p, a)| (*p == peer_id).then(|| a.clone()))
.collect();

if let Some(ephemeral_addresses) = self.ephemeral_addresses.get(&peer_id) {
Expand Down Expand Up @@ -749,16 +749,28 @@ impl NetworkBehaviour for DiscoveryBehaviour {
self.mdns.on_swarm_event(FromSwarm::NewListenAddr(e));
},
FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => {
let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id));
let mut address = addr.clone();

if Self::can_add_to_dht(addr) {
if let Some(Protocol::P2p(peer_id)) = addr.iter().last() {
if peer_id != self.local_peer_id {
warn!(
target: "sub-libp2p",
"🔍 Discovered external address for a peer that is not us: {addr}",
);
// Ensure this address is not propagated to kademlia.
return
}
} else {
address.push(Protocol::P2p(self.local_peer_id));
}

if Self::can_add_to_dht(&address) {
// NOTE: we might re-discover the same address multiple times
// in which case we just want to refrain from logging.
if self.known_external_addresses.insert(new_addr.clone()) {
if self.known_external_addresses.insert(address.clone()) {
info!(
target: "sub-libp2p",
"🔍 Discovered new external address for our node: {}",
new_addr,
"🔍 Discovered new external address for our node: {address}",
);
}
}
Expand Down
115 changes: 86 additions & 29 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,6 @@ pub enum DiscoveryEvent {
/// Peer ID.
peer: PeerId,

/// Identify protocol version.
protocol_version: Option<String>,

/// Identify user agent version.
user_agent: Option<String>,

/// Observed address.
observed_address: Multiaddr,

/// Listen addresses.
listen_addresses: Vec<Multiaddr>,

Expand All @@ -125,7 +116,16 @@ pub enum DiscoveryEvent {

/// New external address discovered.
ExternalAddressDiscovered {
/// Discovered addresses.
/// Discovered address.
address: Multiaddr,
},

/// The external address has expired.
///
/// This happens when the internal buffers exceed the maximum number of external addresses,
/// and this address is the oldest one.
ExternalAddressExpired {
/// Expired address.
address: Multiaddr,
},

Expand Down Expand Up @@ -162,6 +162,9 @@ pub enum DiscoveryEvent {

/// Discovery.
pub struct Discovery {
/// Local peer ID.
local_peer_id: litep2p::PeerId,

/// Ping event stream.
ping_event_stream: Box<dyn Stream<Item = PingEvent> + Send + Unpin>,

Expand Down Expand Up @@ -233,6 +236,7 @@ impl Discovery {
/// Enables `/ipfs/ping/1.0.0` and `/ipfs/identify/1.0.0` by default and starts
/// the mDNS peer discovery if it was enabled.
pub fn new<Hash: AsRef<[u8]> + Clone>(
local_peer_id: litep2p::PeerId,
config: &NetworkConfiguration,
genesis_hash: Hash,
fork_id: Option<&str>,
Expand All @@ -243,11 +247,9 @@ impl Discovery {
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
let (identify_config, identify_event_stream) = IdentifyConfig::new(
"/substrate/1.0".to_string(),
Some(user_agent),
config.public_addresses.clone().into_iter().map(Into::into).collect(),
);

let (identify_config, identify_event_stream) =
IdentifyConfig::new("/substrate/1.0".to_string(), Some(user_agent));

let (mdns_config, mdns_event_stream) = match config.transport {
crate::config::TransportConfig::Normal { enable_mdns, .. } => match enable_mdns {
Expand Down Expand Up @@ -275,6 +277,7 @@ impl Discovery {

(
Self {
local_peer_id,
ping_event_stream,
identify_event_stream,
mdns_event_stream,
Expand Down Expand Up @@ -434,7 +437,13 @@ impl Discovery {
}

/// Check if `address` can be considered a new external address.
fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
///
/// If this address replaces an older address, the expired address is returned.
fn is_new_external_address(
&mut self,
address: &Multiaddr,
peer: PeerId,
) -> (bool, Option<Multiaddr>) {
log::trace!(target: LOG_TARGET, "verify new external address: {address}");

// is the address one of our known addresses
Expand All @@ -445,23 +454,39 @@ impl Discovery {
.chain(self.public_addresses.iter())
.any(|known_address| Discovery::is_known_address(&known_address, &address))
{
return true
return (true, None)
}

match self.address_confirmations.get(address) {
Some(confirmations) => {
confirmations.insert(peer);

if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return true
return (true, None)
}
},
None => {
let oldest = (self.address_confirmations.len() >=
self.address_confirmations.limiter().max_length() as usize)
.then(|| {
self.address_confirmations.pop_oldest().map(|(address, peers)| {
if peers.len() >= MIN_ADDRESS_CONFIRMATIONS {
return Some(address)
} else {
None
}
})
})
.flatten()
.flatten();

self.address_confirmations.insert(address.clone(), Default::default());

return (false, oldest)
},
}

false
(false, None)
}
}

Expand Down Expand Up @@ -533,7 +558,7 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
},
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
Poll::Ready(Some(KademliaEvent::PutRecordSuccess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
match this.find_node_query_id == Some(query_id) {
Expand All @@ -556,31 +581,63 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
// Content provider events are ignored for now.
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) |
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}

match Pin::new(&mut this.identify_event_stream).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(IdentifyEvent::PeerIdentified {
peer,
protocol_version,
user_agent,
listen_addresses,
supported_protocols,
observed_address,
..
})) => {
if this.is_new_external_address(&observed_address, peer) {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
let observed_address =
if let Some(Protocol::P2p(peer_id)) = observed_address.iter().last() {
if peer_id != *this.local_peer_id.as_ref() {
log::warn!(
target: LOG_TARGET,
"Discovered external address for a peer that is not us: {observed_address}",
);
None
} else {
Some(observed_address)
}
} else {
Some(observed_address.with(Protocol::P2p(this.local_peer_id.into())))
};

// Ensure that an external address with a different peer ID does not have
// side effects of evicting other external addresses via `ExternalAddressExpired`.
if let Some(observed_address) = observed_address {
let (is_new, expired_address) =
this.is_new_external_address(&observed_address, peer);

if let Some(expired_address) = expired_address {
log::trace!(
target: LOG_TARGET,
"Removing expired external address expired={expired_address} is_new={is_new} observed={observed_address}",
);

this.pending_events.push_back(DiscoveryEvent::ExternalAddressExpired {
address: expired_address,
});
}

if is_new {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
}
}

return Poll::Ready(Some(DiscoveryEvent::Identified {
peer,
protocol_version,
user_agent,
listen_addresses,
observed_address,
supported_protocols,
}));
},
Expand Down
Loading

0 comments on commit 0605ce2

Please sign in to comment.