Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Providers part 8: unit, e2e, and libp2p conformance tests #258

Merged
merged 60 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
1048320
Introduce `ADD_PROVIDER` query
dmitry-markin Aug 22, 2024
2ab84ae
Merge remote-tracking branch 'origin/master' into dm-provider-queries
dmitry-markin Aug 23, 2024
778078b
Execute `ADD_PROVIDER` query
dmitry-markin Aug 23, 2024
323a26f
Introduce local providers in `MemoryStore`
dmitry-markin Aug 27, 2024
8946809
Add provider refresh interval to Kademlia config
dmitry-markin Aug 28, 2024
474a58a
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 28, 2024
d02acee
Move `FuturesStream` to a separate file
dmitry-markin Aug 28, 2024
5432c9b
Refresh providers: dry-run without network queries
dmitry-markin Aug 28, 2024
feb493d
Remove `try_get_record()` and other `try_...()` non-async methods
dmitry-markin Aug 29, 2024
982c73d
Move query ID generation from `KademliaHandle` to `Kademlia`
dmitry-markin Aug 29, 2024
68c7a87
Republish providers
dmitry-markin Aug 29, 2024
f9ed3e7
Merge remote-tracking branch 'origin/master' into dm-republish-providers
dmitry-markin Aug 30, 2024
764a3f2
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Aug 30, 2024
5e97484
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
7caf290
Use getter `TransportService::local_peer_id()` instead of accessing d…
dmitry-markin Aug 30, 2024
60b8fb6
Introduce `GET_PROVIDERS` query
dmitry-markin Aug 27, 2024
d340c35
Update `get_providers()` to receive query ID via oneshot channel
dmitry-markin Aug 30, 2024
10b96db
Make lines fit into 100 characters
dmitry-markin Aug 30, 2024
6146540
Introduce `GetProvidersContext` & `GetProvidersConfig`
dmitry-markin Sep 2, 2024
e167419
Implement `GET_PROVIDERS` query
dmitry-markin Sep 2, 2024
eb6eee8
Merge returned provider addresses and deduplicate records
dmitry-markin Sep 3, 2024
8314d95
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 3, 2024
974eac5
minor: fix log target
dmitry-markin Sep 3, 2024
e895a44
Revert "Remove `try_get_record()` and other `try_...()` non-async met…
dmitry-markin Sep 4, 2024
ce18594
Revert "Move query ID generation from `KademliaHandle` to `Kademlia`"
dmitry-markin Sep 4, 2024
0fcd621
Use `AtomicUsize` to generate `QueryId` in both `KademliaHandle` and …
dmitry-markin Sep 4, 2024
5f26161
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 4, 2024
57f17ce
Use `open_substream_or_dial()` to add provider records to peers
dmitry-markin Sep 5, 2024
8594103
Merge remote-tracking branch 'origin/master' into dm-add-providers
dmitry-markin Sep 5, 2024
a65ff10
Test local provider refresh scheduled in `MemoryStore`
dmitry-markin Sep 6, 2024
4861959
Fix refresh when we are the only provider
dmitry-markin Sep 9, 2024
c85dd3b
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 10, 2024
2fcd2c6
Merge branch 'dm-get-providers' into dm-test-providers
dmitry-markin Sep 10, 2024
03846c7
Address review suggestions
dmitry-markin Sep 12, 2024
0092329
Merge branch 'dm-get-providers' into dm-test-providers
dmitry-markin Sep 12, 2024
75a7972
Test local provider refresh when there are other providers for the key
dmitry-markin Sep 13, 2024
7492895
Stop providing
dmitry-markin Sep 13, 2024
c90861f
Use `HashMap::entry()` API to remove local providers
dmitry-markin Sep 13, 2024
8c49211
Merge branch 'dm-stop-providing' into dm-test-providers
dmitry-markin Sep 16, 2024
dc11ff1
Extend local providers tests in `MemoryStore`
dmitry-markin Sep 16, 2024
6ebb51b
Test `GetProvidersContext`
dmitry-markin Sep 17, 2024
17fd943
Introduce `ContentProvider` type
dmitry-markin Sep 18, 2024
e853d3b
Merge branch 'dm-add-providers' into dm-republish-providers
dmitry-markin Sep 18, 2024
aeb59cf
Merge branch 'dm-republish-providers' into dm-get-providers
dmitry-markin Sep 18, 2024
674c54e
Merge branch 'dm-get-providers' into dm-stop-providing
dmitry-markin Sep 18, 2024
98523e7
Merge branch 'dm-stop-providing' into dm-improve-providers-api
dmitry-markin Sep 18, 2024
d32bd56
Use `PublicAddresses` API
dmitry-markin Sep 18, 2024
54e8bf5
Use locally known providers when performing `KademliaHandle::get_prov…
dmitry-markin Sep 18, 2024
6b9b24a
Emit `IncomingProvider` event
dmitry-markin Sep 18, 2024
7671f9d
Add e2e test: retrieving provider
dmitry-markin Sep 17, 2024
7a5de31
Merge branch 'dm-improve-providers-api' into dm-test-providers
dmitry-markin Sep 18, 2024
b2e7e33
Update `GetProvidersContext` tests for `ContentProvider` API
dmitry-markin Sep 18, 2024
4861342
Update `MemoryStore` tests for `ContentProvider` API
dmitry-markin Sep 18, 2024
3bb7898
Update Kademlia e2e tests for `ContentProvider` API
dmitry-markin Sep 18, 2024
06a2c4d
Add e2e test for `ADD_PROVIDER`
dmitry-markin Sep 18, 2024
dda8be2
Test `ADD_PROVIDER` litep2p -> libp2p
dmitry-markin Sep 26, 2024
a2027b7
Test `ADD_PROVIDER` libp2p -> litep2p
dmitry-markin Sep 27, 2024
ecee1fd
Test `GET_PROVIDERS` libp2p -> litep2p
dmitry-markin Sep 27, 2024
e7f7b08
Test `GET_PROVIDERS` litep2p -> libp2p
dmitry-markin Sep 27, 2024
86ab9a7
Merge remote-tracking branch 'origin/master' into dm-test-providers
dmitry-markin Sep 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/protocol/libp2p/kademlia/futures_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ impl<F> FuturesStream<F> {
}
}

/// Number of futeres in the stream.
#[cfg(test)]
pub fn len(&self) -> usize {
self.futures.len()
}

/// Push a future for processing.
pub fn push(&mut self, future: F) {
self.futures.push(future);
Expand Down
245 changes: 241 additions & 4 deletions src/protocol/libp2p/kademlia/query/get_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ pub struct GetProvidersContext {

impl GetProvidersContext {
/// Create new [`GetProvidersContext`].
pub fn new(config: GetProvidersConfig, in_peers: VecDeque<KademliaPeer>) -> Self {
pub fn new(config: GetProvidersConfig, candidate_peers: VecDeque<KademliaPeer>) -> Self {
let mut candidates = BTreeMap::new();

for candidate in &in_peers {
let distance = config.target.distance(&candidate.key);
candidates.insert(distance, candidate.clone());
for peer in &candidate_peers {
let distance = config.target.distance(&peer.key);
candidates.insert(distance, peer.clone());
}

let kad_message =
Expand Down Expand Up @@ -273,3 +273,240 @@ impl GetProvidersContext {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::libp2p::kademlia::types::ConnectionType;
use multiaddr::multiaddr;

fn default_config() -> GetProvidersConfig {
GetProvidersConfig {
local_peer_id: PeerId::random(),
parallelism_factor: 3,
query: QueryId(0),
target: Key::new(vec![1, 2, 3].into()),
known_providers: vec![],
}
}

fn peer_to_kad(peer: PeerId) -> KademliaPeer {
KademliaPeer {
peer,
key: Key::from(peer),
addresses: vec![],
connection: ConnectionType::NotConnected,
}
}

fn peer_to_kad_with_addresses(peer: PeerId, addresses: Vec<Multiaddr>) -> KademliaPeer {
KademliaPeer {
peer,
key: Key::from(peer),
addresses,
connection: ConnectionType::NotConnected,
}
}

#[test]
fn completes_when_no_candidates() {
let config = default_config();

let mut context = GetProvidersContext::new(config, VecDeque::new());
assert!(context.is_done());

let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QueryFailed { query: QueryId(0) });
}

#[test]
fn fulfill_parallelism() {
let config = GetProvidersConfig {
parallelism_factor: 3,
..default_config()
};

let candidate_peer_set: HashSet<_> =
[PeerId::random(), PeerId::random(), PeerId::random()].into_iter().collect();
assert_eq!(candidate_peer_set.len(), 3);

let candidate_peers = candidate_peer_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetProvidersContext::new(config, candidate_peers);

for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(candidate_peer_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Fulfilled parallelism.
assert!(context.next_action().is_none());
}

#[test]
fn completes_when_responses() {
let config = GetProvidersConfig {
parallelism_factor: 3,
..default_config()
};

let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();

let candidate_peer_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
assert_eq!(candidate_peer_set.len(), 3);

let candidate_peers =
[peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = GetProvidersContext::new(config, candidate_peers);

let [provider1, provider2, provider3, provider4] = (0..4)
.map(|_| ContentProvider {
peer: PeerId::random(),
addresses: vec![],
})
.collect::<Vec<_>>()
.try_into()
.unwrap();

// Schedule peer queries.
for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(candidate_peer_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Checks a failed query that was not initiated.
let peer_d = PeerId::random();
context.register_response_failure(peer_d);
assert_eq!(context.pending.len(), 3);
assert!(context.queried.is_empty());

// Provide responses back.
let providers = vec![provider1.clone().into(), provider2.clone().into()];
context.register_response(peer_a, providers, vec![]);
assert_eq!(context.pending.len(), 2);
assert_eq!(context.queried.len(), 1);
assert_eq!(context.found_providers.len(), 2);

// Provide different response from peer b with peer d as candidate.
let providers = vec![provider2.clone().into(), provider3.clone().into()];
let candidates = vec![peer_to_kad(peer_d.clone())];
context.register_response(peer_b, providers, candidates);
assert_eq!(context.pending.len(), 1);
assert_eq!(context.queried.len(), 2);
assert_eq!(context.found_providers.len(), 4);
assert_eq!(context.candidates.len(), 1);

// Peer C fails.
context.register_response_failure(peer_c);
assert!(context.pending.is_empty());
assert_eq!(context.queried.len(), 3);
assert_eq!(context.found_providers.len(), 4);

// Drain the last candidate.
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), 1);
assert_eq!(peer, peer_d);
}
_ => panic!("Unexpected event"),
}

// Peer D responds.
let providers = vec![provider4.clone().into()];
context.register_response(peer_d, providers, vec![]);

// Produces the result.
let event = context.next_action().unwrap();
assert_eq!(event, QueryAction::QuerySucceeded { query: QueryId(0) });

// Check results.
let found_providers = context.found_providers();
assert_eq!(found_providers.len(), 4);
assert!(found_providers.contains(&provider1));
assert!(found_providers.contains(&provider2));
assert!(found_providers.contains(&provider3));
assert!(found_providers.contains(&provider4));
}

#[test]
fn providers_sorted_by_distance() {
let target = Key::new(vec![1, 2, 3].into());

let mut peers = (0..10).map(|_| PeerId::random()).collect::<Vec<_>>();
let providers = peers.iter().map(|peer| peer_to_kad(peer.clone())).collect::<Vec<_>>();

let found_providers =
GetProvidersContext::merge_and_sort_providers(providers, target.clone());

peers.sort_by(|p1, p2| {
Key::from(*p1).distance(&target).cmp(&Key::from(*p2).distance(&target))
});

assert!(
std::iter::zip(found_providers.into_iter(), peers.into_iter())
.all(|(provider, peer)| provider.peer == peer)
);
}

#[test]
fn provider_addresses_merged() {
let peer = PeerId::random();

let address1 = multiaddr!(Ip4([127, 0, 0, 1]), Tcp(10000u16));
let address2 = multiaddr!(Ip4([192, 168, 0, 1]), Tcp(10000u16));
let address3 = multiaddr!(Ip4([10, 0, 0, 1]), Tcp(10000u16));
let address4 = multiaddr!(Ip4([1, 1, 1, 1]), Tcp(10000u16));
let address5 = multiaddr!(Ip4([8, 8, 8, 8]), Tcp(10000u16));

let provider1 = peer_to_kad_with_addresses(peer.clone(), vec![address1.clone()]);
let provider2 = peer_to_kad_with_addresses(
peer.clone(),
vec![address2.clone(), address3.clone(), address4.clone()],
);
let provider3 =
peer_to_kad_with_addresses(peer.clone(), vec![address4.clone(), address5.clone()]);

let providers = vec![provider1, provider2, provider3];

let found_providers = GetProvidersContext::merge_and_sort_providers(
providers,
Key::new(vec![1, 2, 3].into()),
);

assert_eq!(found_providers.len(), 1);

let addresses = &found_providers.get(0).unwrap().addresses;
assert_eq!(addresses.len(), 5);
assert!(addresses.contains(&address1));
assert!(addresses.contains(&address2));
assert!(addresses.contains(&address3));
assert!(addresses.contains(&address4));
assert!(addresses.contains(&address5));
}
}
Loading