Skip to content

Commit

Permalink
litep2p/discovery: Publish authority records with external addresses …
Browse files Browse the repository at this point in the history
…only (paritytech#5176)

This PR reduces the occurrences for identified observed addresses.

Litep2p discovers its external addresses by inspecting the
`IdentifyInfo::ObservedAddress` field reported by other peers.
After we get 5 confirmations of the same external observed address (the
address the peer dialed to reach us), the address is reported through
the network layer.

The PR effectively changes this from 5 to 2.
This has a subtle implication on freshly started nodes for the
authority-discovery discussed below.

The PR also makes the authority discovery a bit more robust by not
publishing records if the node doesn't have addresses yet to report.
This aims to fix a scenario where:
- the litep2p node has started, it has some pending observed addresses
but less than 5
- the authorit-discovery publishes a record, but at this time the node
doesn't have any addresses discovered and the record is published
without addresses -> this means other nodes will not be able to reach us

Next Steps
- [ ] versi testing

Closes: paritytech#5147

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
lexnv and bkchr authored Jul 31, 2024
1 parent 39daa61 commit 7d0aa89
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
25 changes: 18 additions & 7 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use ip_network::IpNetwork;
use libp2p::kad::{PeerRecord, Record};
use linked_hash_set::LinkedHashSet;

use log::{debug, error};
use log::{debug, error, trace};
use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64};
use prost::Message;
use rand::{seq::SliceRandom, thread_rng};
Expand Down Expand Up @@ -416,10 +416,12 @@ where
})
.collect::<Vec<_>>();

debug!(
target: LOG_TARGET,
"Publishing authority DHT record peer_id='{local_peer_id}' addresses='{addresses:?}'",
);
if !addresses.is_empty() {
debug!(
target: LOG_TARGET,
"Publishing authority DHT record peer_id='{local_peer_id}' with addresses='{addresses:?}'",
);
}

// The address must include the local peer id.
addresses
Expand All @@ -437,6 +439,17 @@ where
Role::Discover => return Ok(()),
};

let addresses = serialize_addresses(self.addresses_to_publish());
if addresses.is_empty() {
trace!(
target: LOG_TARGET,
"No addresses to publish. Skipping publication."
);

self.publish_interval.set_to_start();
return Ok(())
}

let keys =
Worker::<Client, Block, DhtEventStream>::get_own_public_keys_within_authority_set(
key_store.clone(),
Expand All @@ -459,8 +472,6 @@ where
self.query_interval.set_to_start();
}

let addresses = serialize_addresses(self.addresses_to_publish());

if let Some(metrics) = &self.metrics {
metrics.publish.inc();
metrics
Expand Down
28 changes: 17 additions & 11 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);
/// mDNS query interval.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);

/// Minimum number of confirmations received before an address is verified.
const MIN_ADDRESS_CONFIRMATIONS: usize = 5;

// The minimum number of peers we expect an answer before we terminate the request.
/// The minimum number of peers we expect an answer before we terminate the request.
const GET_RECORD_REDUNDANCY_FACTOR: usize = 4;

/// The maximum number of tracked external addresses we allow.
const MAX_EXTERNAL_ADDRESSES: u32 = 32;

/// Minimum number of confirmations received before an address is verified.
///
/// Note: all addresses are confirmed by libp2p on the first encounter. This aims to make
/// addresses a bit more robust.
const MIN_ADDRESS_CONFIRMATIONS: usize = 2;

/// Discovery events.
#[derive(Debug)]
pub enum DiscoveryEvent {
Expand Down Expand Up @@ -195,7 +201,7 @@ pub struct Discovery {
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,

/// External address confirmations.
address_confirmations: LruMap<Multiaddr, usize>,
address_confirmations: LruMap<Multiaddr, HashSet<PeerId>>,

/// Delay to next `FIND_NODE` query.
duration_to_next_find_query: Duration,
Expand Down Expand Up @@ -278,7 +284,7 @@ impl Discovery {
find_node_query_id: None,
pending_events: VecDeque::new(),
duration_to_next_find_query: Duration::from_secs(1),
address_confirmations: LruMap::new(ByLength::new(8)),
address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
allow_non_global_addresses: config.allow_non_globals_in_dht,
public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
next_kad_query: Some(Delay::new(KADEMLIA_QUERY_INTERVAL)),
Expand Down Expand Up @@ -428,7 +434,7 @@ impl Discovery {
}

/// Check if `address` can be considered a new external address.
fn is_new_external_address(&mut self, address: &Multiaddr) -> bool {
fn is_new_external_address(&mut self, address: &Multiaddr, peer: PeerId) -> bool {
log::trace!(target: LOG_TARGET, "verify new external address: {address}");

// is the address one of our known addresses
Expand All @@ -444,14 +450,14 @@ impl Discovery {

match self.address_confirmations.get(address) {
Some(confirmations) => {
*confirmations += 1usize;
confirmations.insert(peer);

if *confirmations >= MIN_ADDRESS_CONFIRMATIONS {
if confirmations.len() >= MIN_ADDRESS_CONFIRMATIONS {
return true
}
},
None => {
self.address_confirmations.insert(address.clone(), 1usize);
self.address_confirmations.insert(address.clone(), Default::default());
},
}

Expand Down Expand Up @@ -563,7 +569,7 @@ impl Stream for Discovery {
supported_protocols,
observed_address,
})) => {
if this.is_new_external_address(&observed_address) {
if this.is_new_external_address(&observed_address, peer) {
this.pending_events.push_back(DiscoveryEvent::ExternalAddressDiscovered {
address: observed_address.clone(),
});
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let mut addresses = self.external_addresses.write();

if addresses.insert(address.clone()) {
log::info!(target: LOG_TARGET, "discovered new external address for our node: {address}");
log::info!(target: LOG_TARGET, "🔍 Discovered new external address for our node: {address}");
}
}
Some(DiscoveryEvent::Ping { peer, rtt }) => {
Expand Down

0 comments on commit 7d0aa89

Please sign in to comment.