From 4806ff13f30b4f66609e3b8374d9368f50f95e1e Mon Sep 17 00:00:00 2001 From: Brendon Fish Date: Tue, 20 Feb 2024 09:57:40 -0500 Subject: [PATCH] [LIBP2P] Call bootstrap repeatedly In DHT (#2610) * boostrap with timeout * Fix Startup * fixes * fix counter tests * Call bootstrap when routing changes * remove wait_to_connect * lint * add back wait_to_connect * add more context to comment --- .../src/traits/networking/libp2p_network.rs | 34 +++++----- .../src/network/behaviours/dht/mod.rs | 58 +++++------------ .../network/behaviours/exponential_backoff.rs | 4 ++ crates/libp2p-networking/src/network/mod.rs | 1 - .../src/network/node/handle.rs | 64 +++++++++---------- crates/libp2p-networking/tests/common/mod.rs | 10 ++- 6 files changed, 75 insertions(+), 96 deletions(-) diff --git a/crates/hotshot/src/traits/networking/libp2p_network.rs b/crates/hotshot/src/traits/networking/libp2p_network.rs index d7111c0cea..633b70e87e 100644 --- a/crates/hotshot/src/traits/networking/libp2p_network.rs +++ b/crates/hotshot/src/traits/networking/libp2p_network.rs @@ -446,20 +446,7 @@ impl Libp2pNetwork { }; handle.add_known_peers(bs_addrs).await.unwrap(); - // 10 minute timeout - let timeout_duration = Duration::from_secs(600); - // perform connection - info!("WAITING TO CONNECT ON NODE {:?}", id); - handle - .wait_to_connect(4, id, timeout_duration) - .await - .unwrap(); - - let connected_num = handle.num_connected().await?; - metrics_connected_peers - .metrics - .connected_peers - .set(connected_num); + handle.begin_bootstrap().await?; while !is_bootstrapped.load(Ordering::Relaxed) { async_sleep(Duration::from_secs(1)).await; @@ -471,7 +458,6 @@ impl Libp2pNetwork { if is_da { handle.subscribe("DA".to_string()).await.unwrap(); } - // TODO figure out some way of passing in ALL keypairs. That way we can add the // global topic to the topic map // NOTE this wont' work without this change @@ -487,7 +473,6 @@ impl Libp2pNetwork { while handle.put_record(&pk, &handle.peer_id()).await.is_err() { async_sleep(Duration::from_secs(1)).await; } - info!( "Node {:?} is ready, type: {:?}", handle.peer_id(), @@ -497,13 +482,26 @@ impl Libp2pNetwork { while handle.put_record(&handle.peer_id(), &pk).await.is_err() { async_sleep(Duration::from_secs(1)).await; } - + // 10 minute timeout + let timeout_duration = Duration::from_secs(600); + // perform connection + info!("WAITING TO CONNECT ON NODE {:?}", id); + handle + .wait_to_connect(4, id, timeout_duration) + .await + .unwrap(); info!( "node {:?} is barring bootstrap, type: {:?}", handle.peer_id(), node_type ); + let connected_num = handle.num_connected().await?; + metrics_connected_peers + .metrics + .connected_peers + .set(connected_num); + is_ready.store(true, Ordering::Relaxed); info!("STARTING CONSENSUS ON {:?}", handle.peer_id()); Ok::<(), NetworkError>(()) @@ -531,7 +529,7 @@ impl Libp2pNetwork { broadcast_send: &UnboundedSender, ) -> Result<(), NetworkError> { match msg { - GossipMsg(msg, _topic) => { + GossipMsg(msg, _) => { let result: Result = bincode_opts().deserialize(&msg); if let Ok(result) = result { broadcast_send diff --git a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs index 2a61afb9af..54fe4dd13a 100644 --- a/crates/libp2p-networking/src/network/behaviours/dht/mod.rs +++ b/crates/libp2p-networking/src/network/behaviours/dht/mod.rs @@ -60,8 +60,6 @@ pub struct DHTBehaviour { pub kadem: KademliaBehaviour, /// State of bootstrapping pub bootstrap_state: Bootstrap, - /// State of last random walk - pub random_walk: RandomWalk, /// the peer id (useful only for debugging right now) pub peer_id: PeerId, /// replication factor @@ -77,14 +75,6 @@ pub struct Bootstrap { pub backoff: ExponentialBackoff, } -/// State of the periodic random walk -pub struct RandomWalk { - /// State of random walk - state: State, - /// Retry timeout - backoff: ExponentialBackoff, -} - /// State used for random walk and bootstrapping #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum State { @@ -92,8 +82,6 @@ pub enum State { NotStarted, /// In progress Started, - /// Sucessfully completed - Finished, } /// DHT event enum @@ -142,11 +130,6 @@ impl DHTBehaviour { state: State::NotStarted, backoff: ExponentialBackoff::new(2, Duration::from_secs(1)), }, - random_walk: RandomWalk { - state: State::NotStarted, - // TODO jr this may be way too frequent - backoff: ExponentialBackoff::new(2, Duration::from_secs(1)), - }, in_progress_get_closest_peers: HashMap::default(), replication_factor, } @@ -422,10 +405,7 @@ impl DHTBehaviour { if chan.send(()).is_err() { warn!("DHT: finished query but client no longer interested"); }; - } else { - self.random_walk.state = State::NotStarted; - self.random_walk.backoff.start_next(true); - } + }; info!( "peer {:?} successfully completed get closest peers for {:?} with peers {:?}", self.peer_id, key, peers @@ -434,10 +414,7 @@ impl DHTBehaviour { Err(e) => { if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) { let _: Result<_, _> = chan.send(()); - } else { - self.random_walk.state = State::NotStarted; - self.random_walk.backoff.start_next(true); - } + }; warn!( "peer {:?} failed to get closest peers with {:?} and stats {:?}", self.peer_id, e, stats @@ -462,11 +439,13 @@ impl DHTBehaviour { .. } => { if num_remaining == 0 { - // if bootstrap is successful, restart. info!("Finished bootstrap for peer {:?}", self.peer_id); - self.bootstrap_state.state = State::Finished; + self.bootstrap_state.state = State::NotStarted; self.event_queue.push(DHTEvent::IsBootstrapped); - self.begin_bootstrap = false; + // After initial bootstrap suceeds do it every 2 minutes to maintain routing. + self.bootstrap_state.backoff = + ExponentialBackoff::new(1, Duration::from_secs(120)); + self.bootstrap_state.backoff.start_next(true); } else { warn!( "Bootstrap in progress: num remaining nodes to ping {:?}", @@ -512,7 +491,15 @@ impl DHTBehaviour { addresses: _, bucket_range: _, old_peer: _, - } => {} + } => { + // Trigger a new bootstrap when our table changes, if it's not running + // We do this to refresh our peers when we know routing has changed + // For more info see: https://github.com/libp2p/rust-libp2p/pull/4838 + // TODO: Remove once that pr is in a libp2p release + if self.bootstrap_state.state == State::NotStarted { + self.bootstrap_state.backoff.expire(); + } + } e @ KademliaEvent::OutboundQueryProgressed { .. } => { info!("Not handling dht event {:?}", e); } @@ -575,10 +562,6 @@ impl NetworkBehaviour for DHTBehaviour { type ToSwarm = DHTEvent; - // fn new_handler(&mut self) -> Self::ConnectionHandler { - // self.kadem.new_handler() - // } - fn poll( &mut self, cx: &mut std::task::Context<'_>, @@ -590,6 +573,7 @@ impl NetworkBehaviour for DHTBehaviour { match self.kadem.bootstrap() { Ok(_) => { self.bootstrap_state.state = State::Started; + info!("Starting bootstrap"); } Err(e) => { error!( @@ -605,14 +589,6 @@ impl NetworkBehaviour for DHTBehaviour { } } - if matches!(self.random_walk.state, State::NotStarted) - && self.random_walk.backoff.is_expired() - && matches!(self.bootstrap_state.state, State::Finished) - { - self.kadem.get_closest_peers(PeerId::random()); - self.random_walk.state = State::Started; - } - // retry put/gets if they are ready while let Some(req) = self.queued_get_record_queries.pop_front() { if req.backoff.is_expired() { diff --git a/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs b/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs index 62391a2ff0..2091f2abb2 100644 --- a/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs +++ b/crates/libp2p-networking/src/network/behaviours/exponential_backoff.rs @@ -61,6 +61,10 @@ impl ExponentialBackoff { true } } + /// Marked as expired regardless of time left. + pub fn expire(&mut self) { + self.started = None; + } } impl Default for ExponentialBackoff { diff --git a/crates/libp2p-networking/src/network/mod.rs b/crates/libp2p-networking/src/network/mod.rs index 523b5eef34..ac8bcc8500 100644 --- a/crates/libp2p-networking/src/network/mod.rs +++ b/crates/libp2p-networking/src/network/mod.rs @@ -241,7 +241,6 @@ pub async fn spin_up_swarm( ) -> Result<(), NetworkNodeHandleError> { info!("known_nodes{:?}", known_nodes); handle.add_known_peers(known_nodes).await?; - handle.wait_to_connect(4, idx, timeout_len).await?; handle.subscribe("global".to_string()).await?; Ok(()) diff --git a/crates/libp2p-networking/src/network/node/handle.rs b/crates/libp2p-networking/src/network/node/handle.rs index 69e17f3678..8790f20c7a 100644 --- a/crates/libp2p-networking/src/network/node/handle.rs +++ b/crates/libp2p-networking/src/network/node/handle.rs @@ -199,38 +199,6 @@ impl NetworkNodeHandle { }) } - /// Wait until at least `num_peers` have connected, or until `timeout` time has passed. - /// - /// # Errors - /// - /// Will return any networking error encountered, or `ConnectTimeout` if the `timeout` has elapsed. - pub async fn wait_to_connect( - &self, - num_peers: usize, - node_id: usize, - timeout: Duration, - ) -> Result<(), NetworkNodeHandleError> - where - S: Default + Debug, - { - let start = Instant::now(); - self.begin_bootstrap().await?; - let mut connected_ok = false; - while !connected_ok { - if start.elapsed() >= timeout { - return Err(NetworkNodeHandleError::ConnectTimeout); - } - async_sleep(Duration::from_secs(1)).await; - let num_connected = self.num_connected().await?; - info!( - "WAITING TO CONNECT, connected to {} / {} peers ON NODE {}", - num_connected, num_peers, node_id - ); - connected_ok = num_connected >= num_peers; - } - Ok(()) - } - /// Receives a reference of the internal `NetworkNodeReceiver`, which can be used to query for incoming messages. pub fn receiver(&self) -> &NetworkNodeReceiver { &self.receiver @@ -278,7 +246,37 @@ impl NetworkNodeHandle { self.send_request(req).await?; r.await.map_err(|_| NetworkNodeHandleError::RecvError) } - + /// Wait until at least `num_peers` have connected, or until `timeout` time has passed. + /// + /// # Errors + /// + /// Will return any networking error encountered, or `ConnectTimeout` if the `timeout` has elapsed. + pub async fn wait_to_connect( + &self, + num_peers: usize, + node_id: usize, + timeout: Duration, + ) -> Result<(), NetworkNodeHandleError> + where + S: Default + Debug, + { + let start = Instant::now(); + self.begin_bootstrap().await?; + let mut connected_ok = false; + while !connected_ok { + if start.elapsed() >= timeout { + return Err(NetworkNodeHandleError::ConnectTimeout); + } + async_sleep(Duration::from_secs(1)).await; + let num_connected = self.num_connected().await?; + info!( + "WAITING TO CONNECT, connected to {} / {} peers ON NODE {}", + num_connected, num_peers, node_id + ); + connected_ok = num_connected >= num_peers; + } + Ok(()) + } /// Look up a peer's addresses in kademlia /// NOTE: this should always be called before any `request_response` is initiated /// # Errors diff --git a/crates/libp2p-networking/tests/common/mod.rs b/crates/libp2p-networking/tests/common/mod.rs index c23c077c60..49bafb12ab 100644 --- a/crates/libp2p-networking/tests/common/mod.rs +++ b/crates/libp2p-networking/tests/common/mod.rs @@ -145,7 +145,11 @@ pub async fn spin_up_swarms( bootstrap_addrs.push((node.peer_id(), addr)); connecting_futs.push({ let node = node.clone(); - async move { node.wait_to_connect(4, i, timeout_len).await }.boxed_local() + async move { + node.begin_bootstrap().await?; + node.lookup_pid(PeerId::random()).await + } + .boxed_local() }); handles.push(node); } @@ -175,8 +179,8 @@ pub async fn spin_up_swarms( connecting_futs.push({ let node = node.clone(); async move { - node.wait_to_connect(4, num_bootstrap + j, timeout_len) - .await + node.begin_bootstrap().await?; + node.lookup_pid(PeerId::random()).await } .boxed_local() });