Skip to content

Commit

Permalink
[LIBP2P] Call bootstrap repeatedly In DHT (#2610)
Browse files Browse the repository at this point in the history
* boostrap with timeout

* Fix Startup

* fixes

* fix counter tests

* Call bootstrap when routing changes

* remove wait_to_connect

* lint

* add back wait_to_connect

* add more context to comment
  • Loading branch information
bfish713 authored Feb 20, 2024
1 parent 49c9c63 commit 4806ff1
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 96 deletions.
34 changes: 16 additions & 18 deletions crates/hotshot/src/traits/networking/libp2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,20 +446,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
};
handle.add_known_peers(bs_addrs).await.unwrap();

// 10 minute timeout
let timeout_duration = Duration::from_secs(600);
// perform connection
info!("WAITING TO CONNECT ON NODE {:?}", id);
handle
.wait_to_connect(4, id, timeout_duration)
.await
.unwrap();

let connected_num = handle.num_connected().await?;
metrics_connected_peers
.metrics
.connected_peers
.set(connected_num);
handle.begin_bootstrap().await?;

while !is_bootstrapped.load(Ordering::Relaxed) {
async_sleep(Duration::from_secs(1)).await;
Expand All @@ -471,7 +458,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
if is_da {
handle.subscribe("DA".to_string()).await.unwrap();
}

// TODO figure out some way of passing in ALL keypairs. That way we can add the
// global topic to the topic map
// NOTE this wont' work without this change
Expand All @@ -487,7 +473,6 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
while handle.put_record(&pk, &handle.peer_id()).await.is_err() {
async_sleep(Duration::from_secs(1)).await;
}

info!(
"Node {:?} is ready, type: {:?}",
handle.peer_id(),
Expand All @@ -497,13 +482,26 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
while handle.put_record(&handle.peer_id(), &pk).await.is_err() {
async_sleep(Duration::from_secs(1)).await;
}

// 10 minute timeout
let timeout_duration = Duration::from_secs(600);
// perform connection
info!("WAITING TO CONNECT ON NODE {:?}", id);
handle
.wait_to_connect(4, id, timeout_duration)
.await
.unwrap();
info!(
"node {:?} is barring bootstrap, type: {:?}",
handle.peer_id(),
node_type
);

let connected_num = handle.num_connected().await?;
metrics_connected_peers
.metrics
.connected_peers
.set(connected_num);

is_ready.store(true, Ordering::Relaxed);
info!("STARTING CONSENSUS ON {:?}", handle.peer_id());
Ok::<(), NetworkError>(())
Expand Down Expand Up @@ -531,7 +529,7 @@ impl<M: NetworkMsg, K: SignatureKey + 'static> Libp2pNetwork<M, K> {
broadcast_send: &UnboundedSender<M>,
) -> Result<(), NetworkError> {
match msg {
GossipMsg(msg, _topic) => {
GossipMsg(msg, _) => {
let result: Result<M, _> = bincode_opts().deserialize(&msg);
if let Ok(result) = result {
broadcast_send
Expand Down
58 changes: 17 additions & 41 deletions crates/libp2p-networking/src/network/behaviours/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ pub struct DHTBehaviour {
pub kadem: KademliaBehaviour<MemoryStore>,
/// State of bootstrapping
pub bootstrap_state: Bootstrap,
/// State of last random walk
pub random_walk: RandomWalk,
/// the peer id (useful only for debugging right now)
pub peer_id: PeerId,
/// replication factor
Expand All @@ -77,23 +75,13 @@ pub struct Bootstrap {
pub backoff: ExponentialBackoff,
}

/// State of the periodic random walk
pub struct RandomWalk {
/// State of random walk
state: State,
/// Retry timeout
backoff: ExponentialBackoff,
}

/// State used for random walk and bootstrapping
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum State {
/// Not in progress
NotStarted,
/// In progress
Started,
/// Sucessfully completed
Finished,
}

/// DHT event enum
Expand Down Expand Up @@ -142,11 +130,6 @@ impl DHTBehaviour {
state: State::NotStarted,
backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
},
random_walk: RandomWalk {
state: State::NotStarted,
// TODO jr this may be way too frequent
backoff: ExponentialBackoff::new(2, Duration::from_secs(1)),
},
in_progress_get_closest_peers: HashMap::default(),
replication_factor,
}
Expand Down Expand Up @@ -422,10 +405,7 @@ impl DHTBehaviour {
if chan.send(()).is_err() {
warn!("DHT: finished query but client no longer interested");
};
} else {
self.random_walk.state = State::NotStarted;
self.random_walk.backoff.start_next(true);
}
};
info!(
"peer {:?} successfully completed get closest peers for {:?} with peers {:?}",
self.peer_id, key, peers
Expand All @@ -434,10 +414,7 @@ impl DHTBehaviour {
Err(e) => {
if let Some(chan) = self.in_progress_get_closest_peers.remove(&query_id) {
let _: Result<_, _> = chan.send(());
} else {
self.random_walk.state = State::NotStarted;
self.random_walk.backoff.start_next(true);
}
};
warn!(
"peer {:?} failed to get closest peers with {:?} and stats {:?}",
self.peer_id, e, stats
Expand All @@ -462,11 +439,13 @@ impl DHTBehaviour {
..
} => {
if num_remaining == 0 {
// if bootstrap is successful, restart.
info!("Finished bootstrap for peer {:?}", self.peer_id);
self.bootstrap_state.state = State::Finished;
self.bootstrap_state.state = State::NotStarted;
self.event_queue.push(DHTEvent::IsBootstrapped);
self.begin_bootstrap = false;
// After initial bootstrap suceeds do it every 2 minutes to maintain routing.
self.bootstrap_state.backoff =
ExponentialBackoff::new(1, Duration::from_secs(120));
self.bootstrap_state.backoff.start_next(true);
} else {
warn!(
"Bootstrap in progress: num remaining nodes to ping {:?}",
Expand Down Expand Up @@ -512,7 +491,15 @@ impl DHTBehaviour {
addresses: _,
bucket_range: _,
old_peer: _,
} => {}
} => {
// Trigger a new bootstrap when our table changes, if it's not running
// We do this to refresh our peers when we know routing has changed
// For more info see: https://github.com/libp2p/rust-libp2p/pull/4838
// TODO: Remove once that pr is in a libp2p release
if self.bootstrap_state.state == State::NotStarted {
self.bootstrap_state.backoff.expire();
}
}
e @ KademliaEvent::OutboundQueryProgressed { .. } => {
info!("Not handling dht event {:?}", e);
}
Expand Down Expand Up @@ -575,10 +562,6 @@ impl NetworkBehaviour for DHTBehaviour {

type ToSwarm = DHTEvent;

// fn new_handler(&mut self) -> Self::ConnectionHandler {
// self.kadem.new_handler()
// }

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
Expand All @@ -590,6 +573,7 @@ impl NetworkBehaviour for DHTBehaviour {
match self.kadem.bootstrap() {
Ok(_) => {
self.bootstrap_state.state = State::Started;
info!("Starting bootstrap");
}
Err(e) => {
error!(
Expand All @@ -605,14 +589,6 @@ impl NetworkBehaviour for DHTBehaviour {
}
}

if matches!(self.random_walk.state, State::NotStarted)
&& self.random_walk.backoff.is_expired()
&& matches!(self.bootstrap_state.state, State::Finished)
{
self.kadem.get_closest_peers(PeerId::random());
self.random_walk.state = State::Started;
}

// retry put/gets if they are ready
while let Some(req) = self.queued_get_record_queries.pop_front() {
if req.backoff.is_expired() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ impl ExponentialBackoff {
true
}
}
/// Marked as expired regardless of time left.
pub fn expire(&mut self) {
self.started = None;
}
}

impl Default for ExponentialBackoff {
Expand Down
1 change: 0 additions & 1 deletion crates/libp2p-networking/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ pub async fn spin_up_swarm<S: Debug + Default>(
) -> Result<(), NetworkNodeHandleError> {
info!("known_nodes{:?}", known_nodes);
handle.add_known_peers(known_nodes).await?;
handle.wait_to_connect(4, idx, timeout_len).await?;
handle.subscribe("global".to_string()).await?;

Ok(())
Expand Down
64 changes: 31 additions & 33 deletions crates/libp2p-networking/src/network/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,38 +199,6 @@ impl<S: Default + Debug> NetworkNodeHandle<S> {
})
}

/// Wait until at least `num_peers` have connected, or until `timeout` time has passed.
///
/// # Errors
///
/// Will return any networking error encountered, or `ConnectTimeout` if the `timeout` has elapsed.
pub async fn wait_to_connect(
&self,
num_peers: usize,
node_id: usize,
timeout: Duration,
) -> Result<(), NetworkNodeHandleError>
where
S: Default + Debug,
{
let start = Instant::now();
self.begin_bootstrap().await?;
let mut connected_ok = false;
while !connected_ok {
if start.elapsed() >= timeout {
return Err(NetworkNodeHandleError::ConnectTimeout);
}
async_sleep(Duration::from_secs(1)).await;
let num_connected = self.num_connected().await?;
info!(
"WAITING TO CONNECT, connected to {} / {} peers ON NODE {}",
num_connected, num_peers, node_id
);
connected_ok = num_connected >= num_peers;
}
Ok(())
}

/// Receives a reference of the internal `NetworkNodeReceiver`, which can be used to query for incoming messages.
pub fn receiver(&self) -> &NetworkNodeReceiver {
&self.receiver
Expand Down Expand Up @@ -278,7 +246,37 @@ impl<S> NetworkNodeHandle<S> {
self.send_request(req).await?;
r.await.map_err(|_| NetworkNodeHandleError::RecvError)
}

/// Wait until at least `num_peers` have connected, or until `timeout` time has passed.
///
/// # Errors
///
/// Will return any networking error encountered, or `ConnectTimeout` if the `timeout` has elapsed.
pub async fn wait_to_connect(
&self,
num_peers: usize,
node_id: usize,
timeout: Duration,
) -> Result<(), NetworkNodeHandleError>
where
S: Default + Debug,
{
let start = Instant::now();
self.begin_bootstrap().await?;
let mut connected_ok = false;
while !connected_ok {
if start.elapsed() >= timeout {
return Err(NetworkNodeHandleError::ConnectTimeout);
}
async_sleep(Duration::from_secs(1)).await;
let num_connected = self.num_connected().await?;
info!(
"WAITING TO CONNECT, connected to {} / {} peers ON NODE {}",
num_connected, num_peers, node_id
);
connected_ok = num_connected >= num_peers;
}
Ok(())
}
/// Look up a peer's addresses in kademlia
/// NOTE: this should always be called before any `request_response` is initiated
/// # Errors
Expand Down
10 changes: 7 additions & 3 deletions crates/libp2p-networking/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ pub async fn spin_up_swarms<S: Debug + Default>(
bootstrap_addrs.push((node.peer_id(), addr));
connecting_futs.push({
let node = node.clone();
async move { node.wait_to_connect(4, i, timeout_len).await }.boxed_local()
async move {
node.begin_bootstrap().await?;
node.lookup_pid(PeerId::random()).await
}
.boxed_local()
});
handles.push(node);
}
Expand Down Expand Up @@ -175,8 +179,8 @@ pub async fn spin_up_swarms<S: Debug + Default>(
connecting_futs.push({
let node = node.clone();
async move {
node.wait_to_connect(4, num_bootstrap + j, timeout_len)
.await
node.begin_bootstrap().await?;
node.lookup_pid(PeerId::random()).await
}
.boxed_local()
});
Expand Down

0 comments on commit 4806ff1

Please sign in to comment.