From 8eaa96d1ed4cc0fec6f3add5f9fe79f064d20530 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Tue, 27 Aug 2024 09:24:39 +0300 Subject: [PATCH] find_node: Optimize parallelism factor for slow to respond peers (#220) This PR introduces a `peer_timeout` to the `FIND_NODE` queries. - Queries make at most 3 (alpha / parallelism factor) request in parallel - Introduce a `peer_timeout` after which the pending request doesn't count towards the parallelism factor This prevents the query from getting stuck when a peer is slow or fails to respond within due time. The peer can still produce the response at a later time. Since most queries rely on the same primitives (candidates, pending, responses), we can abstract and adopt a shared wrapper for all queries. This would make the same feature available to the other queries, keeping the code simple and reusing the core functionality. However, that can come at a later time. cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile --- .../libp2p/kademlia/query/find_node.rs | 110 ++++++++++++++++-- 1 file changed, 102 insertions(+), 8 deletions(-) diff --git a/src/protocol/libp2p/kademlia/query/find_node.rs b/src/protocol/libp2p/kademlia/query/find_node.rs index d05e8787..2d168e94 100644 --- a/src/protocol/libp2p/kademlia/query/find_node.rs +++ b/src/protocol/libp2p/kademlia/query/find_node.rs @@ -34,6 +34,9 @@ use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_node"; +/// Default timeout for a peer to respond to a query. +const DEFAULT_PEER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + /// The configuration needed to instantiate a new [`FindNodeContext`]. #[derive(Debug, Clone)] pub struct FindNodeConfig>> { @@ -63,7 +66,7 @@ pub struct FindNodeContext>> { kad_message: Bytes, /// Peers from whom the `QueryEngine` is waiting to hear a response. - pub pending: HashMap, + pub pending: HashMap, /// Queried candidates. /// @@ -76,6 +79,18 @@ pub struct FindNodeContext>> { /// Responses. pub responses: BTreeMap, + + /// The timeout after which the pending request is no longer + /// counting towards the parallelism factor. + /// + /// This is used to prevent the query from getting stuck when a peer + /// is slow or fails to respond in due time. + peer_timeout: std::time::Duration, + /// The number of pending responses that count towards the parallelism factor. + /// + /// These represent the number of peers added to the `Self::pending` minus the number of peers + /// that have failed to respond within the `Self::peer_timeout` + pending_responses: usize, } impl>> FindNodeContext { @@ -98,27 +113,34 @@ impl>> FindNodeContext { pending: HashMap::new(), queried: HashSet::new(), responses: BTreeMap::new(), + + peer_timeout: DEFAULT_PEER_TIMEOUT, + pending_responses: 0, } } /// Register response failure for `peer`. pub fn register_response_failure(&mut self, peer: PeerId) { - let Some(peer) = self.pending.remove(&peer) else { - tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist"); + let Some((peer, instant)) = self.pending.remove(&peer) else { + tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist during response failure"); return; }; + self.pending_responses = self.pending_responses.saturating_sub(1); + + tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "peer failed to respond"); self.queried.insert(peer.peer); } /// Register `FIND_NODE` response from `peer`. pub fn register_response(&mut self, peer: PeerId, peers: Vec) { - tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer"); - - let Some(peer) = self.pending.remove(&peer) else { + let Some((peer, instant)) = self.pending.remove(&peer) else { tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "received response from peer but didn't expect it"); return; }; + self.pending_responses = self.pending_responses.saturating_sub(1); + + tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "received response from peer"); // calculate distance for `peer` from target and insert it if // a) the map doesn't have 20 responses @@ -189,7 +211,8 @@ impl>> FindNodeContext { let peer = candidate.peer; tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, "current candidate"); - self.pending.insert(candidate.peer, candidate.clone()); + self.pending.insert(candidate.peer, (candidate, std::time::Instant::now())); + self.pending_responses = self.pending_responses.saturating_add(1); Some(QueryAction::SendMessage { query: self.config.query, @@ -221,9 +244,22 @@ impl>> FindNodeContext { }; } + for (peer, instant) in self.pending.values() { + if instant.elapsed() > self.peer_timeout { + tracing::trace!( + target: LOG_TARGET, + query = ?self.config.query, + ?peer, + elapsed = ?instant.elapsed(), + "peer no longer counting towards parallelism factor" + ); + self.pending_responses = self.pending_responses.saturating_sub(1); + } + } + // At this point, we either have pending responses or candidates to query; and we need more // results. Ensure we do not exceed the parallelism factor. - if self.pending.len() == self.config.parallelism_factor { + if self.pending_responses == self.config.parallelism_factor { return None; } @@ -343,6 +379,64 @@ mod tests { assert!(context.next_action().is_none()); } + #[test] + fn fulfill_parallelism_with_timeout_optimization() { + let config = FindNodeConfig { + parallelism_factor: 3, + ..default_config() + }; + + let in_peers_set = (0..4).map(|_| PeerId::random()).collect::>(); + let in_peers = in_peers_set.iter().map(|peer| peer_to_kad(*peer)).collect(); + let mut context = FindNodeContext::new(config, in_peers); + // Test overwrite. + context.peer_timeout = std::time::Duration::from_secs(1); + + for num in 0..3 { + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), num + 1); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(in_peers_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + } + + // Fulfilled parallelism. + assert!(context.next_action().is_none()); + + // Sleep more than 1 second. + std::thread::sleep(std::time::Duration::from_secs(2)); + + // The pending responses are reset only on the next query action. + assert_eq!(context.pending_responses, 3); + assert_eq!(context.pending.len(), 3); + + // This allows other peers to be queried. + let event = context.next_action().unwrap(); + match event { + QueryAction::SendMessage { query, peer, .. } => { + assert_eq!(query, QueryId(0)); + // Added as pending. + assert_eq!(context.pending.len(), 4); + assert!(context.pending.contains_key(&peer)); + + // Check the peer is the one provided. + assert!(in_peers_set.contains(&peer)); + } + _ => panic!("Unexpected event"), + } + + assert_eq!(context.pending_responses, 1); + assert_eq!(context.pending.len(), 4); + } + #[test] fn completes_when_responses() { let config = FindNodeConfig {