Skip to content

Commit

Permalink
find_node: Optimize parallelism factor for slow to respond peers (#220)
Browse files Browse the repository at this point in the history
This PR introduces a `peer_timeout` to the `FIND_NODE` queries.
- Queries make at most 3 (alpha / parallelism factor) request in
parallel
- Introduce a `peer_timeout` after which the pending request doesn't
count towards the parallelism factor

This prevents the query from getting stuck when a peer is slow or fails
to respond within due time.
The peer can still produce the response at a later time.

Since most queries rely on the same primitives (candidates, pending,
responses), we can abstract and adopt a shared wrapper for all queries.
This would make the same feature available to the other queries, keeping
the code simple and reusing the core functionality. However, that can
come at a later time.

cc @paritytech/networking

---------

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Aug 27, 2024
1 parent fc16596 commit 8eaa96d
Showing 1 changed file with 102 additions and 8 deletions.
110 changes: 102 additions & 8 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node";

/// Default timeout for a peer to respond to a query.
const DEFAULT_PEER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

/// The configuration needed to instantiate a new [`FindNodeContext`].
#[derive(Debug, Clone)]
pub struct FindNodeConfig<T: Clone + Into<Vec<u8>>> {
Expand Down Expand Up @@ -63,7 +66,7 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {
kad_message: Bytes,

/// Peers from whom the `QueryEngine` is waiting to hear a response.
pub pending: HashMap<PeerId, KademliaPeer>,
pub pending: HashMap<PeerId, (KademliaPeer, std::time::Instant)>,

/// Queried candidates.
///
Expand All @@ -76,6 +79,18 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {

/// Responses.
pub responses: BTreeMap<Distance, KademliaPeer>,

/// The timeout after which the pending request is no longer
/// counting towards the parallelism factor.
///
/// This is used to prevent the query from getting stuck when a peer
/// is slow or fails to respond in due time.
peer_timeout: std::time::Duration,
/// The number of pending responses that count towards the parallelism factor.
///
/// These represent the number of peers added to the `Self::pending` minus the number of peers
/// that have failed to respond within the `Self::peer_timeout`
pending_responses: usize,
}

impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
Expand All @@ -98,27 +113,34 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
pending: HashMap::new(),
queried: HashSet::new(),
responses: BTreeMap::new(),

peer_timeout: DEFAULT_PEER_TIMEOUT,
pending_responses: 0,
}
}

/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
let Some(peer) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist");
let Some((peer, instant)) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist during response failure");
return;
};
self.pending_responses = self.pending_responses.saturating_sub(1);

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "peer failed to respond");

self.queried.insert(peer.peer);
}

/// Register `FIND_NODE` response from `peer`.
pub fn register_response(&mut self, peer: PeerId, peers: Vec<KademliaPeer>) {
tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer");

let Some(peer) = self.pending.remove(&peer) else {
let Some((peer, instant)) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it");
return;
};
self.pending_responses = self.pending_responses.saturating_sub(1);

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "received response from peer");

// calculate distance for `peer` from target and insert it if
// a) the map doesn't have 20 responses
Expand Down Expand Up @@ -189,7 +211,8 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
let peer = candidate.peer;

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate");
self.pending.insert(candidate.peer, candidate.clone());
self.pending.insert(candidate.peer, (candidate, std::time::Instant::now()));
self.pending_responses = self.pending_responses.saturating_add(1);

Some(QueryAction::SendMessage {
query: self.config.query,
Expand Down Expand Up @@ -221,9 +244,22 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {
};
}

for (peer, instant) in self.pending.values() {
if instant.elapsed() > self.peer_timeout {
tracing::trace!(
target: LOG_TARGET,
query = ?self.config.query,
?peer,
elapsed = ?instant.elapsed(),
"peer no longer counting towards parallelism factor"
);
self.pending_responses = self.pending_responses.saturating_sub(1);
}
}

// At this point, we either have pending responses or candidates to query; and we need more
// results. Ensure we do not exceed the parallelism factor.
if self.pending.len() == self.config.parallelism_factor {
if self.pending_responses == self.config.parallelism_factor {
return None;
}

Expand Down Expand Up @@ -343,6 +379,64 @@ mod tests {
assert!(context.next_action().is_none());
}

#[test]
fn fulfill_parallelism_with_timeout_optimization() {
let config = FindNodeConfig {
parallelism_factor: 3,
..default_config()
};

let in_peers_set = (0..4).map(|_| PeerId::random()).collect::<HashSet<_>>();
let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = FindNodeContext::new(config, in_peers);
// Test overwrite.
context.peer_timeout = std::time::Duration::from_secs(1);

for num in 0..3 {
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), num + 1);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}
}

// Fulfilled parallelism.
assert!(context.next_action().is_none());

// Sleep more than 1 second.
std::thread::sleep(std::time::Duration::from_secs(2));

// The pending responses are reset only on the next query action.
assert_eq!(context.pending_responses, 3);
assert_eq!(context.pending.len(), 3);

// This allows other peers to be queried.
let event = context.next_action().unwrap();
match event {
QueryAction::SendMessage { query, peer, .. } => {
assert_eq!(query, QueryId(0));
// Added as pending.
assert_eq!(context.pending.len(), 4);
assert!(context.pending.contains_key(&peer));

// Check the peer is the one provided.
assert!(in_peers_set.contains(&peer));
}
_ => panic!("Unexpected event"),
}

assert_eq!(context.pending_responses, 1);
assert_eq!(context.pending.len(), 4);
}

#[test]
fn completes_when_responses() {
let config = FindNodeConfig {
Expand Down

0 comments on commit 8eaa96d

Please sign in to comment.