Skip to content

Commit

Permalink
Implement graceful shutdown for the peer set (#3071)
Browse files Browse the repository at this point in the history
Co-authored-by: Alfredo Garcia <[email protected]>
  • Loading branch information
teor2345 and oxarbitrage authored Nov 18, 2021
1 parent c4118dc commit 3fc049e
Showing 1 changed file with 37 additions and 6 deletions.
43 changes: 37 additions & 6 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
use std::{
collections::HashMap,
convert,
fmt::Debug,
future::Future,
marker::PhantomData,
Expand Down Expand Up @@ -147,7 +148,18 @@ where
<D::Service as Service<Request>>::Future: Send + 'static,
<D::Service as Load>::Metric: Debug,
{
/// Construct a peerset which uses `discover` internally.
/// Construct a peerset which uses `discover` to manage peer connections.
///
/// Arguments:
/// - `config`: configures the peer set connection limit;
/// - `discover`: handles peer connects and disconnects;
/// - `demand_signal`: requests more peers when all peers are busy (unready);
/// - `handle_rx`: receives background task handles,
/// monitors them to make sure they're still running,
/// and shuts down all the tasks as soon as one task exits;
/// - `inv_stream`: receives inventory advertisements for peers,
/// allowing the peer set to direct inventory requests;
/// - `address_book`: when peer set is busy, it logs address book diagnostics.
pub fn new(
config: &Config,
discover: D,
Expand All @@ -172,6 +184,10 @@ where
}
}

/// Check background task handles to make sure they're still running.
///
/// If any background task exits, shuts down all other background tasks,
/// and returns an error.
fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> {
if self.guards.is_empty() {
match self.handle_rx.try_recv() {
Expand All @@ -187,13 +203,28 @@ where
}
}

match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => {}
Poll::Ready(Some(res)) => res??,
Poll::Ready(None) => Err("all background tasks have exited")?,
let exit_error = match Pin::new(&mut self.guards).poll_next(cx) {
Poll::Pending => return Ok(()),
Poll::Ready(Some(res)) => {
info!(
background_tasks = %self.guards.len(),
"a peer set background task exited, shutting down other peer set tasks"
);

// Flatten the join result and inner result,
// then turn Ok() task exits into errors.
res.map_err(Into::into)
.and_then(convert::identity)
.and(Err("a peer set background task exited".into()))
}
Poll::Ready(None) => Err("all peer set background tasks have exited".into()),
};

for guard in self.guards.iter() {
guard.abort();
}

Ok(())
exit_error
}

fn poll_unready(&mut self, cx: &mut Context<'_>) {
Expand Down

0 comments on commit 3fc049e

Please sign in to comment.