Skip to content

Commit

Permalink
feat(iroh-net): upgrade to new swarm-discovery api (#2605)
Browse files Browse the repository at this point in the history
## Description

The new `swarm-discovery` api allows you to remove and add addresses you
want to publish, without needing to restart the swarm-discovery service.

There is a chance that this process (restarting the swarm-discovery
service) caused flaky-ness in our `test_local_swarm_discovery` test.

## Change checklist

- [x] Self-review.

---------

Co-authored-by: Kasey Huizinga <[email protected]>
  • Loading branch information
ramfox and Kasey Huizinga authored Aug 9, 2024
1 parent 1c86dac commit a9c96a9
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 54 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ ring = "0.17"
rustls = { version = "0.21.11", default-features = false, features = ["dangerous_configuration"] }
serde = { version = "1", features = ["derive", "rc"] }
smallvec = "1.11.1"
swarm-discovery = { version = "0.2.0", optional = true }
swarm-discovery = { version = "0.2.1", optional = true }
socket2 = "0.5.3"
stun-rs = "0.1.5"
surge-ping = "0.8.0"
Expand Down
82 changes: 31 additions & 51 deletions iroh-net/src/discovery/local_swarm_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ impl LocalSwarmDiscovery {
let (send, recv) = async_channel::bounded(64);
let task_sender = send.clone();
let rt = tokio::runtime::Handle::current();
let mut guard = Some(LocalSwarmDiscovery::spawn_discoverer(
let discovery = LocalSwarmDiscovery::spawn_discoverer(
node_id,
task_sender.clone(),
BTreeSet::new(),
&rt,
)?);
)?;

let handle = tokio::spawn(async move {
let mut node_addrs: HashMap<PublicKey, Peer> = HashMap::default();
Expand Down Expand Up @@ -171,21 +171,12 @@ impl LocalSwarmDiscovery {
}
Message::ChangeLocalAddrs(addrs) => {
trace!(?addrs, "LocalSwarmDiscovery Message::ChangeLocalAddrs");
let callback_send = task_sender.clone();
let g = guard.take();
drop(g);
guard = match LocalSwarmDiscovery::spawn_discoverer(
node_id,
callback_send.clone(),
addrs.direct_addresses,
&rt,
) {
Ok(guard) => Some(guard),
Err(e) => {
error!("LocalSwarmDiscovery error creating discovery service: {e}");
return;
}
};
discovery.remove_all();
let addrs =
LocalSwarmDiscovery::socketaddrs_to_addrs(addrs.direct_addresses);
for addr in addrs {
discovery.add(addr.0, addr.1)
}
}
}
}
Expand Down Expand Up @@ -213,38 +204,27 @@ impl LocalSwarmDiscovery {
.send_blocking(Message::Discovery(node_id.to_string(), peer.clone()))
.ok();
};
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
let mut has_ipv4 = false;
let mut has_ipv6 = false;
for socketaddr in socketaddrs {
if !has_ipv6 && socketaddr.is_ipv6() {
has_ipv6 = true;
};
if !has_ipv4 && socketaddr.is_ipv4() {
has_ipv4 = true;
};
addrs
.entry(socketaddr.port())
.and_modify(|a| a.push(socketaddr.ip()))
.or_insert(vec![socketaddr.ip()]);
}

let ip_class = match (has_ipv4, has_ipv6) {
(true, true) => IpClass::V4AndV6,
(true, false) => IpClass::V4Only,
(false, true) => IpClass::V6Only,
// this case indicates no ip addresses were supplied, in which case, default to ipv4
(false, false) => IpClass::V4Only,
};
let addrs = LocalSwarmDiscovery::socketaddrs_to_addrs(socketaddrs);
let mut discoverer =
Discoverer::new_interactive(N0_LOCAL_SWARM.to_string(), node_id.to_string())
.with_callback(callback)
.with_ip_class(ip_class);
.with_ip_class(IpClass::Auto);
for addr in addrs {
discoverer = discoverer.with_addrs(addr.0, addr.1);
}
discoverer.spawn(rt)
}

fn socketaddrs_to_addrs(socketaddrs: BTreeSet<SocketAddr>) -> HashMap<u16, Vec<IpAddr>> {
let mut addrs: HashMap<u16, Vec<IpAddr>> = HashMap::default();
for socketaddr in socketaddrs {
addrs
.entry(socketaddr.port())
.and_modify(|a| a.push(socketaddr.ip()))
.or_insert(vec![socketaddr.ip()]);
}
addrs
}
}

impl From<&Peer> for DiscoveryItem {
Expand Down Expand Up @@ -298,10 +278,10 @@ mod tests {
#[tokio::test]
async fn test_local_swarm_discovery() -> TestResult {
let _guard = iroh_test::logging::setup();
let (node_id_a, discovery_a) = make_discoverer()?;
let (_, discovery_b) = make_discoverer()?;
let (_, discovery_a) = make_discoverer()?;
let (node_id_b, discovery_b) = make_discoverer()?;

// make addr info for discoverer a
// make addr info for discoverer b
let addr_info = AddrInfo {
relay_url: None,
direct_addresses: BTreeSet::from(["0.0.0.0:11111".parse()?]),
Expand All @@ -310,15 +290,15 @@ mod tests {
// pass in endpoint, this is never used
let ep = crate::endpoint::Builder::default().bind(0).await?;
// resolve twice to ensure we can create separate streams for the same node_id
let mut s1 = discovery_b.resolve(ep.clone(), node_id_a).unwrap();
let mut s2 = discovery_b.resolve(ep, node_id_a).unwrap();
tracing::debug!(?node_id_a, "Discovering node id a");
// publish discovery_a's address
discovery_a.publish(&addr_info);
let s1_res = tokio::time::timeout(Duration::from_secs(10), s1.next())
let mut s1 = discovery_a.resolve(ep.clone(), node_id_b).unwrap();
let mut s2 = discovery_a.resolve(ep, node_id_b).unwrap();
tracing::debug!(?node_id_b, "Discovering node id b");
// publish discovery_b's address
discovery_b.publish(&addr_info);
let s1_res = tokio::time::timeout(Duration::from_secs(5), s1.next())
.await?
.unwrap()?;
let s2_res = tokio::time::timeout(Duration::from_secs(10), s2.next())
let s2_res = tokio::time::timeout(Duration::from_secs(5), s2.next())
.await?
.unwrap()?;
assert_eq!(s1_res.addr_info, addr_info);
Expand Down

0 comments on commit a9c96a9

Please sign in to comment.