Skip to content

Commit

Permalink
feat(swarm): allow behaviours to share addresses of peers
Browse files Browse the repository at this point in the history
Resolves: libp2p#4302.

Pull-Request: libp2p#4371.
  • Loading branch information
StemCll authored Jan 24, 2024
1 parent fbad30e commit c48a16a
Show file tree
Hide file tree
Showing 20 changed files with 611 additions and 75 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-dcutr = { version = "0.11.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.41.1", path = "transports/dns" }
libp2p-floodsub = { version = "0.44.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.46.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.44.1", path = "protocols/identify" }
libp2p-identify = { version = "0.44.2", path = "protocols/identify" }
libp2p-identity = { version = "0.2.8" }
libp2p-kad = { version = "0.45.3", path = "protocols/kad" }
libp2p-mdns = { version = "0.45.1", path = "protocols/mdns" }
Expand All @@ -99,11 +99,11 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.2", path = "transports/quic" }
libp2p-relay = { version = "0.17.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.1", path = "protocols/request-response" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-server = { version = "0.12.5", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.1", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.2", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm = { version = "0.44.2", path = "swarm" }
libp2p-swarm-derive = { version = "=0.34.3", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required.
libp2p-swarm-test = { version = "0.3.0", path = "swarm-test" }
libp2p-tcp = { version = "0.41.0", path = "transports/tcp" }
libp2p-tls = { version = "0.3.0", path = "transports/tls" }
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name = "libp2p-autonat"
edition = "2021"
rust-version = { workspace = true }
description = "NAT and firewall detection for libp2p"
version = "0.12.0"
authors = ["David Craven <[email protected]>", "Elena Frank <[email protected]>"]
version = "0.12.0"
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
Expand Down
1 change: 1 addition & 0 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ impl Behaviour {
pub fn add_server(&mut self, peer: PeerId, address: Option<Multiaddr>) {
self.servers.insert(peer);
if let Some(addr) = address {
#[allow(deprecated)]
self.inner.add_address(&peer, addr);
}
}
Expand Down
1 change: 1 addition & 0 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ async fn wait_for_reservation(
SwarmEvent::ExternalAddrConfirmed { address } if !is_renewal => {
assert_eq!(address, client_addr);
}
SwarmEvent::NewExternalAddrOfPeer { .. } => {}
e => panic!("{e:?}"),
}
}
Expand Down
7 changes: 7 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.44.2

- Emit `ToSwarm::NewExternalAddrOfPeer` for all external addresses of remote peers.
For this work, the address cache must be enabled via `identify::Config::with_cache_size`.
The default is 0, i.e. disabled.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.44.1

- Ensure `Multiaddr` handled and returned by `Behaviour` are `/p2p` terminated.
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-identify"
edition = "2021"
rust-version = { workspace = true }
description = "Nodes identifcation protocol for libp2p"
version = "0.44.1"
version = "0.44.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
72 changes: 31 additions & 41 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use libp2p_identity::PublicKey;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{
ConnectionDenied, DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour,
NotifyHandler, StreamUpgradeError, THandlerInEvent, ToSwarm,
NotifyHandler, PeerAddresses, StreamUpgradeError, THandlerInEvent, ToSwarm,
};
use libp2p_swarm::{ConnectionId, THandler, THandlerOutEvent};
use lru::LruCache;

use std::collections::hash_map::Entry;
use std::num::NonZeroUsize;
use std::{
collections::{HashMap, HashSet, VecDeque},
iter::FromIterator,
task::Context,
task::Poll,
time::Duration,
Expand Down Expand Up @@ -200,9 +199,9 @@ impl Behaviour {
.or_default()
.insert(conn, addr);

if let Some(entry) = self.discovered_peers.get_mut(&peer_id) {
if let Some(cache) = self.discovered_peers.0.as_mut() {
for addr in failed_addresses {
entry.remove(addr);
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -268,13 +267,23 @@ impl NetworkBehaviour for Behaviour {
info.listen_addrs
.retain(|addr| multiaddr_matches_peer_id(addr, &peer_id));

// Replace existing addresses to prevent other peer from filling up our memory.
self.discovered_peers
.put(peer_id, info.listen_addrs.iter().cloned());

let observed = info.observed_addr.clone();
self.events
.push_back(ToSwarm::GenerateEvent(Event::Received { peer_id, info }));
.push_back(ToSwarm::GenerateEvent(Event::Received {
peer_id,
info: info.clone(),
}));

if let Some(ref mut discovered_peers) = self.discovered_peers.0 {
for address in &info.listen_addrs {
if discovered_peers.add(peer_id, address.clone()) {
self.events.push_back(ToSwarm::NewExternalAddrOfPeer {
peer_id,
address: address.clone(),
});
}
}
}

match self.our_observed_addresses.entry(id) {
Entry::Vacant(not_yet_observed) => {
Expand Down Expand Up @@ -387,11 +396,11 @@ impl NetworkBehaviour for Behaviour {
self.our_observed_addresses.remove(&connection_id);
}
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(entry) = peer_id.and_then(|id| self.discovered_peers.get_mut(&id)) {
if let DialError::Transport(errors) = error {
for (addr, _error) in errors {
entry.remove(addr);
}
if let (Some(peer_id), Some(cache), DialError::Transport(errors)) =
(peer_id, self.discovered_peers.0.as_mut(), error)
{
for (addr, _error) in errors {
cache.remove(&peer_id, addr);
}
}
}
Expand Down Expand Up @@ -445,42 +454,23 @@ fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
true
}

struct PeerCache(Option<LruCache<PeerId, HashSet<Multiaddr>>>);
struct PeerCache(Option<PeerAddresses>);

impl PeerCache {
fn disabled() -> Self {
Self(None)
}

fn enabled(size: NonZeroUsize) -> Self {
Self(Some(LruCache::new(size)))
}

fn get_mut(&mut self, peer: &PeerId) -> Option<&mut HashSet<Multiaddr>> {
self.0.as_mut()?.get_mut(peer)
}

fn put(&mut self, peer: PeerId, addresses: impl Iterator<Item = Multiaddr>) {
let cache = match self.0.as_mut() {
None => return,
Some(cache) => cache,
};

let addresses = addresses.filter_map(|a| a.with_p2p(peer).ok());
cache.put(peer, HashSet::from_iter(addresses));
Self(Some(PeerAddresses::new(size)))
}

fn get(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let cache = match self.0.as_mut() {
None => return Vec::new(),
Some(cache) => cache,
};

cache
.get(peer)
.cloned()
.map(Vec::from_iter)
.unwrap_or_default()
if let Some(cache) = self.0.as_mut() {
cache.get(peer).collect()
} else {
Vec::new()
}
}
}

Expand Down
74 changes: 73 additions & 1 deletion protocols/identify/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn only_emits_address_candidate_once_per_connection() {
async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(5)
.take(8)
.collect::<Vec<_>>()
.await;

Expand Down Expand Up @@ -156,6 +156,78 @@ async fn only_emits_address_candidate_once_per_connection() {
);
}

#[async_std::test]
async fn emits_unique_listen_addresses() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

let mut swarm1 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("a".to_string(), identity.public())
.with_agent_version("b".to_string())
.with_interval(Duration::from_secs(1))
.with_cache_size(10),
)
});
let mut swarm2 = Swarm::new_ephemeral(|identity| {
identify::Behaviour::new(
identify::Config::new("c".to_string(), identity.public())
.with_agent_version("d".to_string()),
)
});

let (swarm2_mem_listen_addr, swarm2_tcp_listen_addr) =
swarm2.listen().with_memory_addr_external().await;
let swarm2_peer_id = *swarm2.local_peer_id();
swarm1.connect(&mut swarm2).await;

async_std::task::spawn(swarm2.loop_on_next());

let swarm_events = futures::stream::poll_fn(|cx| swarm1.poll_next_unpin(cx))
.take(8)
.collect::<Vec<_>>()
.await;

let infos = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::Behaviour(identify::Event::Received { info, .. }) => Some(info.clone()),
_ => None,
})
.collect::<Vec<_>>();

assert!(
infos.len() > 1,
"should exchange identify payload more than once"
);

let listen_addrs = infos
.iter()
.map(|i| i.listen_addrs.clone())
.collect::<Vec<_>>();

for addrs in listen_addrs {
assert_eq!(addrs.len(), 2);
assert!(addrs.contains(&swarm2_mem_listen_addr));
assert!(addrs.contains(&swarm2_tcp_listen_addr));
}

let reported_addrs = swarm_events
.iter()
.filter_map(|e| match e {
SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => {
Some((*peer_id, address.clone()))
}
_ => None,
})
.collect::<Vec<_>>();

assert_eq!(reported_addrs.len(), 2, "To have two addresses of remote");
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_mem_listen_addr)));
assert!(reported_addrs.contains(&(swarm2_peer_id, swarm2_tcp_listen_addr)));
}

#[async_std::test]
async fn identify_push() {
let _ = tracing_subscriber::fmt()
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
See [PR 4371](https://github.com/libp2p/rust-libp2p/pull/4371).

## 0.26.1

- Derive `PartialOrd` and `Ord` for `{Out,In}boundRequestId`.
Expand Down
4 changes: 2 additions & 2 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.1"
version = "0.26.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down Expand Up @@ -40,7 +40,7 @@ libp2p-yamux = { workspace = true }
rand = "0.8"
libp2p-swarm-test = { path = "../../swarm-test" }
futures_ringbuf = "0.4.0"
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Passing arguments to the docsrs builder in order to properly document cfg's.
Expand Down
Loading

0 comments on commit c48a16a

Please sign in to comment.