diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs index 9c7d8039..439772c3 100644 --- a/src/protocol/libp2p/kademlia/futures_stream.rs +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -44,6 +44,12 @@ impl FuturesStream { } } + /// Number of futeres in the stream. + #[cfg(test)] + pub fn len(&self) -> usize { + self.futures.len() + } + /// Push a future for processing. pub fn push(&mut self, future: F) { self.futures.push(future); diff --git a/src/protocol/libp2p/kademlia/query/get_providers.rs b/src/protocol/libp2p/kademlia/query/get_providers.rs index b2ae19c2..fcccda8a 100644 --- a/src/protocol/libp2p/kademlia/query/get_providers.rs +++ b/src/protocol/libp2p/kademlia/query/get_providers.rs @@ -81,12 +81,12 @@ pub struct GetProvidersContext { impl GetProvidersContext { /// Create new [`GetProvidersContext`]. - pub fn new(config: GetProvidersConfig, in_peers: VecDeque) -> Self { + pub fn new(config: GetProvidersConfig, candidate_peers: VecDeque) -> Self { let mut candidates = BTreeMap::new(); - for candidate in &in_peers { - let distance = config.target.distance(&candidate.key); - candidates.insert(distance, candidate.clone()); + for peer in &candidate_peers { + let distance = config.target.distance(&peer.key); + candidates.insert(distance, peer.clone()); } let kad_message = @@ -273,3 +273,240 @@ impl GetProvidersContext { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::libp2p::kademlia::types::ConnectionType; + use multiaddr::multiaddr; + + fn default_config() -> GetProvidersConfig { + GetProvidersConfig { + local_peer_id: PeerId::random(), + parallelism_factor: 3, + query: QueryId(0), + target: Key::new(vec![1, 2, 3].into()), + known_providers: vec![], + } + } + + fn peer_to_kad(peer: PeerId) -> KademliaPeer { + KademliaPeer { + peer, + key: Key::from(peer), + addresses: vec![], + connection: ConnectionType::NotConnected, + } + } + + fn peer_to_kad_with_addresses(peer: PeerId, addresses: Vec) -> KademliaPeer { + KademliaPeer { + peer, + key: Key::from(peer), + addresses, + connection: ConnectionType::NotConnected, + } + } + + #[test] + fn completes_when_no_candidates() { + let config = default_config(); + + let mut context = GetProvidersContext::new(config, VecDeque::new()); + assert!(context.is_done()); + + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) }); + } + + #[test] + fn fulfill_parallelism() { + let config = GetProvidersConfig { + parallelism_factor: 3, + ..default_config() + }; + + let candidate_peer_set: HashSet<_> = + [PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect(); + assert_eq!(candidate_peer_set.len(), 3); + + let candidate_peers = candidate_peer_set.iter().map(|peer| peer_to_kad(*peer)).collect(); + let mut context = GetProvidersContext::new(config, candidate_peers); + + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(candidate_peer_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Fulfilled parallelism. + assert!(context.next_action().is_none()); + } + + #[test] + fn completes_when_responses() { + let config = GetProvidersConfig { + parallelism_factor: 3, + ..default_config() + }; + + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + let candidate_peer_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect(); + assert_eq!(candidate_peer_set.len(), 3); + + let candidate_peers = + [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect(); + let mut context = GetProvidersContext::new(config, candidate_peers); + + let [provider1, provider2, provider3, provider4] = (0..4) + .map(|_| ContentProvider { + peer: PeerId::random(), + addresses: vec![], + }) + .collect::>() + .try_into() + .unwrap(); + + // Schedule peer queries. + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(candidate_peer_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Checks a failed query that was not initiated. + let peer_d = PeerId::random(); + context.register_response_failure(peer_d); + assert_eq!(context.pending.len(), 3); + assert!(context.queried.is_empty()); + + // Provide responses back. + let providers = vec![provider1.clone().into(), provider2.clone().into()]; + context.register_response(peer_a, providers, vec![]); + assert_eq!(context.pending.len(), 2); + assert_eq!(context.queried.len(), 1); + assert_eq!(context.found_providers.len(), 2); + + // Provide different response from peer b with peer d as candidate. + let providers = vec![provider2.clone().into(), provider3.clone().into()]; + let candidates = vec![peer_to_kad(peer_d.clone())]; + context.register_response(peer_b, providers, candidates); + assert_eq!(context.pending.len(), 1); + assert_eq!(context.queried.len(), 2); + assert_eq!(context.found_providers.len(), 4); + assert_eq!(context.candidates.len(), 1); + + // Peer C fails. + context.register_response_failure(peer_c); + assert!(context.pending.is_empty()); + assert_eq!(context.queried.len(), 3); + assert_eq!(context.found_providers.len(), 4); + + // Drain the last candidate. + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 1); + assert_eq!(peer, peer_d); + } + _ => panic!("Unexpected event"), + } + + // Peer D responds. + let providers = vec![provider4.clone().into()]; + context.register_response(peer_d, providers, vec![]); + + // Produces the result. + let event = context.next_action().unwrap(); + assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) }); + + // Check results. + let found_providers = context.found_providers(); + assert_eq!(found_providers.len(), 4); + assert!(found_providers.contains(&provider1)); + assert!(found_providers.contains(&provider2)); + assert!(found_providers.contains(&provider3)); + assert!(found_providers.contains(&provider4)); + } + + #[test] + fn providers_sorted_by_distance() { + let target = Key::new(vec![1, 2, 3].into()); + + let mut peers = (0..10).map(|_| PeerId::random()).collect::>(); + let providers = peers.iter().map(|peer| peer_to_kad(peer.clone())).collect::>(); + + let found_providers = + GetProvidersContext::merge_and_sort_providers(providers, target.clone()); + + peers.sort_by(|p1, p2| { + Key::from(*p1).distance(&target).cmp(&Key::from(*p2).distance(&target)) + }); + + assert!( + std::iter::zip(found_providers.into_iter(), peers.into_iter()) + .all(|(provider, peer)| provider.peer == peer) + ); + } + + #[test] + fn provider_addresses_merged() { + let peer = PeerId::random(); + + let address1 = multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10000u16)); + let address2 = multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16)); + let address3 = multiaddr!(Ip4([10, 0, 0, 1]), Tcp(10000u16)); + let address4 = multiaddr!(Ip4([1, 1, 1, 1]), Tcp(10000u16)); + let address5 = multiaddr!(Ip4([8, 8, 8, 8]), Tcp(10000u16)); + + let provider1 = peer_to_kad_with_addresses(peer.clone(), vec![address1.clone()]); + let provider2 = peer_to_kad_with_addresses( + peer.clone(), + vec![address2.clone(), address3.clone(), address4.clone()], + ); + let provider3 = + peer_to_kad_with_addresses(peer.clone(), vec![address4.clone(), address5.clone()]); + + let providers = vec![provider1, provider2, provider3]; + + let found_providers = GetProvidersContext::merge_and_sort_providers( + providers, + Key::new(vec![1, 2, 3].into()), + ); + + assert_eq!(found_providers.len(), 1); + + let addresses = &found_providers.get(0).unwrap().addresses; + assert_eq!(addresses.len(), 5); + assert!(addresses.contains(&address1)); + assert!(addresses.contains(&address2)); + assert!(addresses.contains(&address3)); + assert!(addresses.contains(&address4)); + assert!(addresses.contains(&address5)); + } +} diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 6453bdc8..1c6292c0 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -40,6 +40,7 @@ use std::{ const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store"; /// Memory store events. +#[derive(Debug, PartialEq, Eq)] pub enum MemoryStoreAction { RefreshProvider { provided_key: Key, @@ -837,5 +838,238 @@ mod tests { assert_eq!(store.get_providers(&key3), vec![]); } - // TODO: test local providers. + #[test] + fn local_provider_registered() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::new(local_peer_id); + + let key = Key::from(vec![1, 2, 3]); + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.local_providers.is_empty()); + assert_eq!(store.pending_provider_refresh.len(), 0); + + assert!(store.put_provider(key.clone(), local_provider.clone())); + + assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!(store.pending_provider_refresh.len(), 1); + } + + #[test] + fn local_provider_registered_after_remote_provider() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::new(local_peer_id); + + let key = Key::from(vec![1, 2, 3]); + + let remote_peer_id = PeerId::random(); + let remote_provider = ContentProvider { + peer: remote_peer_id, + addresses: vec![multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16))], + }; + + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.local_providers.is_empty()); + assert_eq!(store.pending_provider_refresh.len(), 0); + + assert!(store.put_provider(key.clone(), remote_provider.clone())); + assert!(store.put_provider(key.clone(), local_provider.clone())); + + let got_providers = store.get_providers(&key); + assert_eq!(got_providers.len(), 2); + assert!(got_providers.contains(&remote_provider)); + assert!(got_providers.contains(&local_provider)); + + assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + assert_eq!(store.pending_provider_refresh.len(), 1); + } + + #[test] + fn local_provider_removed() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::new(local_peer_id); + + let key = Key::from(vec![1, 2, 3]); + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.local_providers.is_empty()); + + assert!(store.put_provider(key.clone(), local_provider.clone())); + + assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + + store.remove_local_provider(key.clone()); + + assert!(store.get_providers(&key).is_empty()); + assert!(store.local_providers.is_empty()); + } + + #[test] + fn local_provider_removed_when_remote_providers_present() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::new(local_peer_id); + + let key = Key::from(vec![1, 2, 3]); + + let remote_peer_id = PeerId::random(); + let remote_provider = ContentProvider { + peer: remote_peer_id, + addresses: vec![multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16))], + }; + + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.put_provider(key.clone(), remote_provider.clone())); + assert!(store.put_provider(key.clone(), local_provider.clone())); + + let got_providers = store.get_providers(&key); + assert_eq!(got_providers.len(), 2); + assert!(got_providers.contains(&remote_provider)); + assert!(got_providers.contains(&local_provider)); + + assert_eq!(store.local_providers.get(&key), Some(&local_provider),); + + store.remove_local_provider(key.clone()); + + assert_eq!(store.get_providers(&key), vec![remote_provider]); + assert!(store.local_providers.is_empty()); + } + + #[tokio::test] + async fn local_provider_refresh() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: Duration::from_secs(5), + ..Default::default() + }, + ); + + let key = Key::from(vec![1, 2, 3]); + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.put_provider(key.clone(), local_provider.clone())); + + assert_eq!(store.get_providers(&key), vec![local_provider.clone()]); + assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + + // No actions are instantly generated. + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), store.next_action()).await, + Err(_), + )); + // The local provider is refreshed. + assert_eq!( + tokio::time::timeout(Duration::from_secs(10), store.next_action()) + .await + .unwrap(), + Some(MemoryStoreAction::RefreshProvider { + provided_key: key, + provider: local_provider + }), + ); + } + + #[tokio::test] + async fn local_provider_inserted_after_remote_provider_refresh() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: Duration::from_secs(5), + ..Default::default() + }, + ); + + let key = Key::from(vec![1, 2, 3]); + + let remote_peer_id = PeerId::random(); + let remote_provider = ContentProvider { + peer: remote_peer_id, + addresses: vec![multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16))], + }; + + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.put_provider(key.clone(), remote_provider.clone())); + assert!(store.put_provider(key.clone(), local_provider.clone())); + + let got_providers = store.get_providers(&key); + assert_eq!(got_providers.len(), 2); + assert!(got_providers.contains(&remote_provider)); + assert!(got_providers.contains(&local_provider)); + + assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + + // No actions are instantly generated. + assert!(matches!( + tokio::time::timeout(Duration::from_secs(1), store.next_action()).await, + Err(_), + )); + // The local provider is refreshed. + assert_eq!( + tokio::time::timeout(Duration::from_secs(10), store.next_action()) + .await + .unwrap(), + Some(MemoryStoreAction::RefreshProvider { + provided_key: key, + provider: local_provider + }), + ); + } + + #[tokio::test] + async fn removed_local_provider_not_refreshed() { + let local_peer_id = PeerId::random(); + let mut store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: Duration::from_secs(1), + ..Default::default() + }, + ); + + let key = Key::from(vec![1, 2, 3]); + let local_provider = ContentProvider { + peer: local_peer_id, + addresses: vec![multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10001u16))], + }; + + assert!(store.put_provider(key.clone(), local_provider.clone())); + + assert_eq!(store.get_providers(&key), vec![local_provider.clone()]); + assert_eq!(store.local_providers.get(&key), Some(&local_provider)); + + store.remove_local_provider(key); + + // The local provider is not refreshed in 10 secs (future fires at 1 sec and yields `None`). + assert_eq!( + tokio::time::timeout(Duration::from_secs(5), store.next_action()).await, + Ok(None), + ); + assert!(matches!( + tokio::time::timeout(Duration::from_secs(5), store.next_action()).await, + Err(_), + )); + } } diff --git a/tests/conformance/rust/kademlia.rs b/tests/conformance/rust/kademlia.rs index a501b581..990d307a 100644 --- a/tests/conformance/rust/kademlia.rs +++ b/tests/conformance/rust/kademlia.rs @@ -22,8 +22,11 @@ use futures::StreamExt; use libp2p::{ identify, identity, - kad::{self, store::RecordStore}, - swarm::{keep_alive, NetworkBehaviour, SwarmBuilder, SwarmEvent}, + kad::{ + self, store::RecordStore, AddProviderOk, GetProvidersOk, InboundRequest, + KademliaEvent as Libp2pKademliaEvent, QueryResult, RecordKey as Libp2pRecordKey, + }, + swarm::{keep_alive, AddressScore, NetworkBehaviour, SwarmBuilder, SwarmEvent}, PeerId, Swarm, }; use litep2p::{ @@ -33,9 +36,10 @@ use litep2p::{ ConfigBuilder, KademliaEvent, KademliaHandle, Quorum, Record, RecordKey, }, transport::tcp::config::Config as TcpConfig, + types::multiaddr::{Multiaddr, Protocol}, Litep2p, }; -use multiaddr::Protocol; +use std::time::Duration; #[derive(NetworkBehaviour)] struct Behaviour { @@ -407,3 +411,241 @@ async fn get_record() { } } } + +#[tokio::test] +async fn litep2p_add_provider_to_libp2p() { + let (mut litep2p, mut litep2p_kad) = initialize_litep2p(); + let mut libp2p = initialize_libp2p(); + + // Drive libp2p a little bit to get the listen address. + let get_libp2p_listen_addr = async { + loop { + if let SwarmEvent::NewListenAddr { address, .. } = libp2p.select_next_some().await { + break address; + } + } + }; + let libp2p_listen_addr = tokio::time::timeout(Duration::from_secs(10), get_libp2p_listen_addr) + .await + .expect("didn't get libp2p listen address in 10 seconds"); + + let litep2p_public_addr: Multiaddr = "/ip6/::1/tcp/10000".parse().unwrap(); + litep2p.public_addresses().add_address(litep2p_public_addr.clone()).unwrap(); + // Get public address with peer ID. + let litep2p_public_addr = litep2p.public_addresses().get_addresses().pop().unwrap(); + + let libp2p_peer_id = litep2p::PeerId::from_bytes(&libp2p.local_peer_id().to_bytes()).unwrap(); + litep2p_kad.add_known_peer(libp2p_peer_id, vec![libp2p_listen_addr]).await; + + let litep2p_peer_id = PeerId::from_bytes(&litep2p.local_peer_id().to_bytes()).unwrap(); + let key = vec![1u8, 2u8, 3u8]; + litep2p_kad.start_providing(RecordKey::new(&key)).await; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("provider was not added in 10 secs") + } + _ = litep2p.next_event() => {} + _ = litep2p_kad.next() => {} + event = libp2p.select_next_some() => { + if let SwarmEvent::Behaviour(BehaviourEvent::Kad(event)) = event { + if let Libp2pKademliaEvent::InboundRequest{request} = event { + if let InboundRequest::AddProvider{..} = request { + let store = libp2p.behaviour_mut().kad.store_mut(); + let mut providers = store.providers(&key.clone().into()); + assert_eq!(providers.len(), 1); + let record = providers.pop().unwrap(); + + assert_eq!(record.key.as_ref(), key); + assert_eq!(record.provider, litep2p_peer_id); + assert_eq!(record.addresses, vec![litep2p_public_addr.clone()]); + break + } + } + } + } + } + } +} + +#[tokio::test] +async fn libp2p_add_provider_to_litep2p() { + let (mut litep2p, mut litep2p_kad) = initialize_litep2p(); + let mut libp2p = initialize_libp2p(); + + let libp2p_peerid = litep2p::PeerId::from_bytes(&libp2p.local_peer_id().to_bytes()).unwrap(); + let libp2p_public_addr: Multiaddr = "/ip4/1.1.1.1/tcp/10000".parse().unwrap(); + libp2p.add_external_address(libp2p_public_addr.clone(), AddressScore::Infinite); + + let litep2p_peerid = PeerId::from_bytes(&litep2p.local_peer_id().to_bytes()).unwrap(); + let litep2p_address = litep2p.listen_addresses().next().unwrap().clone(); + libp2p.behaviour_mut().kad.add_address(&litep2p_peerid, litep2p_address); + + // Start providing + let key = vec![1u8, 2u8, 3u8]; + libp2p.behaviour_mut().kad.start_providing(key.clone().into()).unwrap(); + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("provider was not added in 10 secs") + } + _ = litep2p.next_event() => {} + _ = libp2p.select_next_some() => {} + event = litep2p_kad.next() => { + if let Some(KademliaEvent::IncomingProvider{ provided_key, provider }) = event { + assert_eq!(provided_key, key.clone().into()); + assert_eq!(provider.peer, libp2p_peerid); + assert_eq!(provider.addresses, vec![libp2p_public_addr]); + + break + } + } + } + } +} + +#[tokio::test] +async fn litep2p_get_providers_from_libp2p() { + let (mut litep2p, mut litep2p_kad) = initialize_litep2p(); + let mut libp2p = initialize_libp2p(); + + let libp2p_peerid = litep2p::PeerId::from_bytes(&libp2p.local_peer_id().to_bytes()).unwrap(); + let libp2p_public_addr: Multiaddr = "/ip4/1.1.1.1/tcp/10000".parse().unwrap(); + libp2p.add_external_address(libp2p_public_addr.clone(), AddressScore::Infinite); + + // Start providing + let key = vec![1u8, 2u8, 3u8]; + let query_id = libp2p.behaviour_mut().kad.start_providing(key.clone().into()).unwrap(); + + let mut libp2p_listen_addr = None; + let mut provider_stored = false; + + // Drive libp2p a little bit to get listen address and make sure the provider was store + // loacally. + tokio::time::timeout(Duration::from_secs(10), async { + loop { + match libp2p.select_next_some().await { + SwarmEvent::Behaviour(BehaviourEvent::Kad( + Libp2pKademliaEvent::OutboundQueryProgressed { id, result, .. }, + )) => { + assert_eq!(id, query_id); + assert!( + matches!(result, QueryResult::StartProviding(Ok(AddProviderOk { key })) + if key == Libp2pRecordKey::from(key.clone())) + ); + + provider_stored = true; + + if libp2p_listen_addr.is_some() { + break; + } + } + SwarmEvent::NewListenAddr { address, .. } => { + libp2p_listen_addr = Some(address); + + if provider_stored { + break; + } + } + _ => {} + } + } + }) + .await + .expect("failed to store provider and get listen address in 10 seconds"); + + let libp2p_listen_addr = libp2p_listen_addr.unwrap(); + + // `GET_PROVIDERS` + litep2p_kad + .add_known_peer(libp2p_peerid, vec![libp2p_listen_addr.clone()]) + .await; + let original_query_id = litep2p_kad.get_providers(key.clone().into()).await; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("provider was not added in 10 secs") + } + _ = litep2p.next_event() => {} + _ = libp2p.select_next_some() => {} + event = litep2p_kad.next() => { + if let Some(KademliaEvent::GetProvidersSuccess { + query_id, + provided_key, + mut providers, + }) = event { + assert_eq!(query_id, original_query_id); + assert_eq!(provided_key, key.clone().into()); + assert_eq!(providers.len(), 1); + + let provider = providers.pop().unwrap(); + assert_eq!(provider.peer, libp2p_peerid); + assert_eq!(provider.addresses.len(), 2); + assert!(provider.addresses.contains(&libp2p_listen_addr)); + assert!(provider.addresses.contains(&libp2p_public_addr)); + + break + } + } + } + } +} + +#[tokio::test] +async fn libp2p_get_providers_from_litep2p() { + let (mut litep2p, mut litep2p_kad) = initialize_litep2p(); + let mut libp2p = initialize_libp2p(); + + let litep2p_peerid = PeerId::from_bytes(&litep2p.local_peer_id().to_bytes()).unwrap(); + let litep2p_listen_address = litep2p.listen_addresses().next().unwrap().clone(); + let litep2p_public_address: Multiaddr = "/ip4/1.1.1.1/tcp/10000".parse().unwrap(); + litep2p.public_addresses().add_address(litep2p_public_address).unwrap(); + + // Store provider locally in litep2p. + let original_key = vec![1u8, 2u8, 3u8]; + litep2p_kad.start_providing(original_key.clone().into()).await; + + // Drive litep2p a little bit to make sure the provider record is stored and no `ADD_PROVIDER` + // requests are generated (because no peers are know yet). + tokio::time::timeout(Duration::from_secs(2), async { + litep2p.next_event().await; + }) + .await + .unwrap_err(); + + libp2p.behaviour_mut().kad.add_address(&litep2p_peerid, litep2p_listen_address); + let query_id = libp2p.behaviour_mut().kad.get_providers(original_key.clone().into()); + + loop { + tokio::select! { + event = libp2p.select_next_some() => { + match event { + SwarmEvent::Behaviour(BehaviourEvent::Kad( + Libp2pKademliaEvent::OutboundQueryProgressed { id, result, .. }) + ) => { + assert_eq!(id, query_id); + if let QueryResult::GetProviders(Ok( + GetProvidersOk::FoundProviders { key, providers } + )) = result { + assert_eq!(key, original_key.clone().into()); + assert_eq!(providers.len(), 1); + assert!(providers.contains(&litep2p_peerid)); + // It looks like `libp2p` discards the cached provider addresses received + // in `GET_PROVIDERS` response, so we can't check it here. + // The addresses are neither used to extend the `libp2p` routing table. + break + } else { + panic!("invalid query result") + } + } + _ => {} + } + } + _ = litep2p.next_event() => {} + _ = litep2p_kad.next() => {} + } + } +} diff --git a/tests/protocol/kademlia.rs b/tests/protocol/kademlia.rs index 893871e9..bd121c03 100644 --- a/tests/protocol/kademlia.rs +++ b/tests/protocol/kademlia.rs @@ -25,12 +25,14 @@ use litep2p::{ config::ConfigBuilder, crypto::ed25519::Keypair, protocol::libp2p::kademlia::{ - ConfigBuilder as KademliaConfigBuilder, IncomingRecordValidationMode, KademliaEvent, - PeerRecord, Quorum, Record, RecordKey, RecordsType, + ConfigBuilder as KademliaConfigBuilder, ContentProvider, IncomingRecordValidationMode, + KademliaEvent, PeerRecord, Quorum, Record, RecordKey, RecordsType, }, transport::tcp::config::Config as TcpConfig, + types::multiaddr::{Multiaddr, Protocol}, Litep2p, PeerId, }; +use sc_network::config::MultiaddrWithPeerId; fn spawn_litep2p(port: u16) { let (kad_config1, _kad_handle1) = KademliaConfigBuilder::new().build(); @@ -461,3 +463,173 @@ async fn get_record_retrieves_remote_records() { } } } + +#[tokio::test] +async fn provider_retrieved_by_remote_node() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new().build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + // Register at least one public address. + let peer1 = *litep2p1.local_peer_id(); + let peer1_public_address = "/ip4/192.168.0.1/tcp/10000" + .parse::() + .unwrap() + .with(Protocol::P2p(peer1.into())); + litep2p1.public_addresses().add_address(peer1_public_address.clone()); + assert_eq!( + litep2p1.public_addresses().get_addresses(), + vec![peer1_public_address.clone()], + ); + + // Store provider locally. + let key = RecordKey::new(&vec![1, 2, 3]); + kad_handle1.start_providing(key.clone()).await; + + // This is the expected provider. + let expected_provider = ContentProvider { + peer: peer1, + addresses: vec![peer1_public_address], + }; + + // This request to get rpovider should fail because the nodes are not connected. + let query1 = kad_handle2.get_providers(key.clone()).await; + let mut query2 = None; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("provider was not retrieved in 10 secs") + } + event = litep2p1.next_event() => {} + event = litep2p2.next_event() => {} + event = kad_handle1.next() => {} + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::QueryFailed { query_id }) => { + // Query failed, because the nodes don't know about each other yet. + assert_eq!(query_id, query1); + + // Let the node know about `litep2p1`. + kad_handle2 + .add_known_peer( + *litep2p1.local_peer_id(), + litep2p1.listen_addresses().cloned().collect(), + ) + .await; + + // And request providers again. + query2 = Some(kad_handle2.get_providers(key.clone()).await); + } + Some(KademliaEvent::GetProvidersSuccess { + query_id, + provided_key, + providers, + }) => { + assert_eq!(query_id, query2.unwrap()); + assert_eq!(provided_key, key); + assert_eq!(providers.len(), 1); + assert_eq!(providers.first().unwrap(), &expected_provider); + + break + } + _ => {} + } + } + } + } +} + +#[tokio::test] +async fn provider_added_to_remote_node() { + let (kad_config1, mut kad_handle1) = KademliaConfigBuilder::new().build(); + let (kad_config2, mut kad_handle2) = KademliaConfigBuilder::new().build(); + + let config1 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config1) + .build(); + + let config2 = ConfigBuilder::new() + .with_tcp(TcpConfig { + listen_addresses: vec!["/ip6/::1/tcp/0".parse().unwrap()], + ..Default::default() + }) + .with_libp2p_kademlia(kad_config2) + .build(); + + let mut litep2p1 = Litep2p::new(config1).unwrap(); + let mut litep2p2 = Litep2p::new(config2).unwrap(); + + // Register at least one public address. + let peer1 = *litep2p1.local_peer_id(); + let peer1_public_address = "/ip4/192.168.0.1/tcp/10000" + .parse::() + .unwrap() + .with(Protocol::P2p(peer1.into())); + litep2p1.public_addresses().add_address(peer1_public_address.clone()); + assert_eq!( + litep2p1.public_addresses().get_addresses(), + vec![peer1_public_address.clone()], + ); + + // Let peer1 know about peer2. + kad_handle1 + .add_known_peer( + *litep2p2.local_peer_id(), + litep2p2.listen_addresses().cloned().collect(), + ) + .await; + + // Start provodong. + let key = RecordKey::new(&vec![1, 2, 3]); + kad_handle1.start_providing(key.clone()).await; + + // This is the expected provider. + let expected_provider = ContentProvider { + peer: peer1, + addresses: vec![peer1_public_address], + }; + + loop { + tokio::select! { + _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { + panic!("provider was not retrieved in 10 secs") + } + event = litep2p1.next_event() => {} + event = litep2p2.next_event() => {} + event = kad_handle1.next() => {} + event = kad_handle2.next() => { + match event { + Some(KademliaEvent::IncomingProvider { provided_key, provider }) => { + assert_eq!(provided_key, key); + assert_eq!(provider, expected_provider); + break + } + _ => {} + } + } + } + } +}