Skip to content

Commit

Permalink
fix(iroh-p2p): implement full local lookup (#537)
Browse files Browse the repository at this point in the history
We were previously only returning the peer id, external addrs, and
listening addrs. We now return the same content as performing a lookup
on a remote peer.
  • Loading branch information
ramfox authored Nov 22, 2022
1 parent 066aefd commit 0c388b9
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 23 deletions.
14 changes: 1 addition & 13 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,7 @@ impl P2p {
}

pub async fn lookup_local(&self) -> Result<Lookup> {
let (_, listen_addrs) = self
.client
.get_listening_addrs()
.await
.map_err(|e| map_service_error("p2p", e))?;
Ok(Lookup {
peer_id: self.client.local_peer_id().await?,
listen_addrs,
observed_addrs: self.client.external_addresses().await?,
protocol_version: String::new(),
agent_version: String::new(),
protocols: Default::default(),
})
self.client.lookup_local().await
}

pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup> {
Expand Down
7 changes: 5 additions & 2 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use crate::config::Libp2pConfig;
mod event;
mod peer_manager;

pub const PROTOCOL_VERSION: &str = "ipfs/0.1.0";
pub const AGENT_VERSION: &str = concat!("iroh/", env!("CARGO_PKG_VERSION"));

/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
Expand Down Expand Up @@ -188,8 +191,8 @@ impl NodeBehaviour {
};

let identify = {
let config = identify::Config::new("ipfs/0.1.0".into(), local_key.public())
.with_agent_version(format!("iroh/{}", env!("CARGO_PKG_VERSION")))
let config = identify::Config::new(PROTOCOL_VERSION.into(), local_key.public())
.with_agent_version(String::from(AGENT_VERSION))
.with_cache_size(64 * 1024);
identify::Behaviour::new(config)
};
Expand Down
26 changes: 25 additions & 1 deletion iroh-p2p/src/behaviour/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lru::LruCache;
pub struct PeerManager {
info: AHashMap<PeerId, Info>,
bad_peers: LruCache<PeerId, ()>,
supported_protocols: Vec<String>,
}

#[derive(Default, Debug, Clone)]
Expand All @@ -43,6 +44,7 @@ impl Default for PeerManager {
PeerManager {
info: Default::default(),
bad_peers: LruCache::new(DEFAULT_BAD_PEER_CAP.unwrap()),
supported_protocols: Default::default(),
}
}
}
Expand All @@ -68,6 +70,10 @@ impl PeerManager {
pub fn info_for_peer(&self, peer_id: &PeerId) -> Option<&Info> {
self.info.get(peer_id)
}

pub fn supported_protocols(&self) -> Vec<String> {
self.supported_protocols.clone()
}
}

impl NetworkBehaviour for PeerManager {
Expand Down Expand Up @@ -186,8 +192,26 @@ impl NetworkBehaviour for PeerManager {
fn poll(
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// TODO(ramfox):
// We can only get the supported protocols of the local node by examining the
// `PollParameters`, which mean you can only get the supported protocols by examining the
// `PollParameters` in this method (`poll`) of a network behaviour.
// I injected this responsibility in the `peer_manager`, because it's the only "simple"
// network behaviour we have implemented.
// There is an issue up to remove `PollParameters`, and a discussion into how to instead
// get the `supported_protocols` of the node:
// https://github.com/libp2p/rust-libp2p/issues/3124
// When that is resolved, we can hopefully remove this responsibility from the `peer_manager`,
// where it, frankly, doesn't belong.
if self.supported_protocols.is_empty() {
self.supported_protocols = params
.supported_protocols()
.map(|p| String::from_utf8_lossy(&p).to_string())
.collect();
}

Poll::Pending
}
}
66 changes: 64 additions & 2 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};

use iroh_bitswap::{BitswapEvent, Block};
use iroh_rpc_client::Lookup;

use crate::keys::{Keychain, Storage};
use crate::providers::Providers;
Expand Down Expand Up @@ -953,6 +954,29 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
response_channel.send(None).ok();
}
}
RpcMessage::LookupLocalPeerInfo(response_channel) => {
let peer_id = self.swarm.local_peer_id();
let listen_addrs = self.swarm.listeners().cloned().collect();
let observed_addrs = self
.swarm
.external_addresses()
.map(|a| a.addr.clone())
.collect();
let protocol_version = String::from(crate::behaviour::PROTOCOL_VERSION);
let agent_version = String::from(crate::behaviour::AGENT_VERSION);
let protocols = self.swarm.behaviour().peer_manager.supported_protocols();

response_channel
.send(Lookup {
peer_id: *peer_id,
listen_addrs,
observed_addrs,
agent_version,
protocol_version,
protocols,
})
.ok();
}
RpcMessage::CancelListenForIdentify(response_channel, peer_id) => {
self.lookup_queries.remove(&peer_id);
response_channel.send(()).ok();
Expand Down Expand Up @@ -1312,6 +1336,12 @@ mod tests {
let peer_id_b = test_runner_b.client.local_peer_id().await?;
assert_eq!(test_runner_b.peer_id, peer_id_b);

let lookup_a = test_runner_a.client.lookup_local().await?;
// since we aren't connected to any other nodes, we should not
// have any information about our observed addresses
assert!(lookup_a.observed_addrs.is_empty());
assert_lookup(lookup_a, test_runner_a.peer_id, &test_runner_a.addr)?;

// connect
test_runner_a.client.connect(peer_id_b, addrs_b).await?;
// Make sure we have exchanged identity information
Expand All @@ -1323,8 +1353,7 @@ mod tests {

// lookup
let lookup_b = test_runner_a.client.lookup(peer_id_b, None).await?;
assert_eq!(peer_id_b, lookup_b.peer_id);

assert_lookup(lookup_b, test_runner_b.peer_id, &test_runner_b.addr)?;
// now that we are connected & have exchanged identity information,
// we should now be able to view the node's external addrs
// these are the addresses that other nodes tell you "this is the address I see for you"
Expand All @@ -1339,6 +1368,39 @@ mod tests {
Ok(())
}

// assert_lookup ensures each part of the lookup is equal
fn assert_lookup(
got: Lookup,
expected_peer_id: PeerId,
expected_addr: &Multiaddr,
) -> Result<()> {
let expected_protocols = vec![
"/ipfs/ping/1.0.0",
"/ipfs/id/1.0.0",
"/ipfs/id/push/1.0.0",
"/ipfs/bitswap/1.2.0",
"/ipfs/bitswap/1.1.0",
"/ipfs/bitswap/1.0.0",
"/ipfs/bitswap",
"/ipfs/kad/1.0.0",
"/libp2p/autonat/1.0.0",
"/libp2p/circuit/relay/0.2.0/hop",
"/libp2p/circuit/relay/0.2.0/stop",
"/libp2p/dcutr",
"/meshsub/1.1.0",
"/meshsub/1.0.0",
];
let expected_protocol_version = "ipfs/0.1.0";
let expected_agent_version = "iroh/0.1.0";

assert_eq!(expected_peer_id, got.peer_id);
assert!(got.listen_addrs.contains(expected_addr));
assert_eq!(expected_protocols, got.protocols);
assert_eq!(expected_protocol_version, got.protocol_version);
assert_eq!(expected_agent_version, got.agent_version);
Ok(())
}

#[tokio::test]
async fn test_gossipsub() -> Result<()> {
let mut test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?;
Expand Down
23 changes: 22 additions & 1 deletion iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing::{debug, trace};

use async_trait::async_trait;
use iroh_bitswap::Block;
use iroh_rpc_client::Lookup;
use iroh_rpc_types::p2p::{
BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest,
GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse, GossipsubPeerAndTopics,
Expand Down Expand Up @@ -319,6 +320,14 @@ impl RpcP2p for P2p {
Ok(ack)
}

#[tracing::instrument(skip(self))]
async fn lookup_local(&self, _: ()) -> Result<PeerInfo> {
let (s, r) = oneshot::channel();
self.sender.send(RpcMessage::LookupLocalPeerInfo(s)).await?;
let lookup = r.await?;
Ok(peer_info_from_lookup(lookup))
}

#[tracing::instrument(skip(self, req))]
async fn lookup(&self, req: LookupRequest) -> Result<PeerInfo> {
let (s, r) = oneshot::channel();
Expand Down Expand Up @@ -522,7 +531,18 @@ fn peer_info_from_identify_info(i: IdentifyInfo) -> PeerInfo {
.map(|addr| addr.to_vec())
.collect(),
protocols: i.protocols,
observed_addr: i.observed_addr.to_vec(),
observed_addrs: vec![i.observed_addr.to_vec()],
}
}

fn peer_info_from_lookup(l: Lookup) -> PeerInfo {
PeerInfo {
peer_id: l.peer_id.to_bytes(),
protocol_version: l.protocol_version,
agent_version: l.agent_version,
listen_addrs: l.listen_addrs.iter().map(|a| a.to_vec()).collect(),
protocols: l.protocols,
observed_addrs: l.observed_addrs.iter().map(|a| a.to_vec()).collect(),
}
}

Expand Down Expand Up @@ -583,6 +603,7 @@ pub enum RpcMessage {
ListenForIdentify(oneshot::Sender<Result<IdentifyInfo>>, PeerId),
CancelListenForIdentify(oneshot::Sender<()>, PeerId),
AddressesOfPeer(oneshot::Sender<Vec<Multiaddr>>, PeerId),
LookupLocalPeerInfo(oneshot::Sender<Lookup>),
Shutdown,
}

Expand Down
17 changes: 15 additions & 2 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ impl P2pClient {
Lookup::from_peer_info(peer_info)
}

#[tracing::instrument(skip(self))]
pub async fn lookup_local(&self) -> Result<Lookup> {
let peer_info = self.backend.lookup_local(()).await?;
Lookup::from_peer_info(peer_info)
}

#[tracing::instrument(skip(self))]
pub async fn disconnect(&self, peer_id: PeerId) -> Result<()> {
warn!("NetDisconnect not yet implemented on p2p node");
Expand Down Expand Up @@ -296,14 +302,14 @@ impl Lookup {
fn from_peer_info(p: PeerInfo) -> Result<Self> {
let peer_id = peer_id_from_bytes(p.peer_id)?;
let listen_addrs = addrs_from_bytes(p.listen_addrs)?;
let addr = addr_from_bytes(p.observed_addr)?;
let observed_addrs = addrs_from_bytes(p.observed_addrs)?;
Ok(Self {
peer_id,
protocol_version: p.protocol_version,
agent_version: p.agent_version,
listen_addrs,
protocols: p.protocols,
observed_addrs: vec![addr],
observed_addrs,
})
}
}
Expand Down Expand Up @@ -523,6 +529,13 @@ mod tests {
todo!()
}

async fn lookup_local(
&self,
_request: Request<()>,
) -> Result<tonic::Response<PeerInfo>, tonic::Status> {
todo!()
}

async fn gossipsub_add_explicit_peer(
&self,
_request: Request<GossipsubPeerIdMsg>,
Expand Down
5 changes: 3 additions & 2 deletions iroh-rpc-types/proto/p2p.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ service P2p {
rpc PeerDisconnect(DisconnectRequest) returns (google.protobuf.Empty) {}
rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty) {}
rpc Lookup(LookupRequest) returns (PeerInfo) {}
rpc LookupLocal(google.protobuf.Empty) returns (PeerInfo) {}

rpc GossipsubAddExplicitPeer(GossipsubPeerIdMsg) returns (google.protobuf.Empty) {}
rpc GossipsubAllMeshPeers(google.protobuf.Empty) returns (GossipsubPeersResponse) {}
Expand Down Expand Up @@ -132,8 +133,8 @@ message PeerInfo {
repeated bytes listen_addrs = 4;
// vec of Strings
repeated string protocols = 5;
// Multiaddr
bytes observed_addr = 6;
// vec of Multiaddr
repeated bytes observed_addrs = 6;
}
message Multiaddrs {
// Serialized list of multiaddrs
Expand Down
1 change: 1 addition & 0 deletions iroh-rpc-types/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ proxy!(
peer_connect_by_peer_id: ConnectByPeerIdRequest => () => (),
peer_disconnect: DisconnectRequest => () => (),
lookup: LookupRequest => PeerInfo => PeerInfo,
lookup_local: () => PeerInfo => PeerInfo,
gossipsub_add_explicit_peer: GossipsubPeerIdMsg => () => (),
gossipsub_all_mesh_peers: () => GossipsubPeersResponse => GossipsubPeersResponse,
gossipsub_all_peers: () => GossipsubAllPeersResponse => GossipsubAllPeersResponse,
Expand Down

0 comments on commit 0c388b9

Please sign in to comment.