From 3fc049e2eb3f6a3822eba4a63d506d8dbb94493c Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 18 Nov 2021 23:28:25 +1000 Subject: [PATCH] Implement graceful shutdown for the peer set (#3071) Co-authored-by: Alfredo Garcia --- zebra-network/src/peer_set/set.rs | 43 ++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 6531859960a..0b614637e11 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -44,6 +44,7 @@ use std::{ collections::HashMap, + convert, fmt::Debug, future::Future, marker::PhantomData, @@ -147,7 +148,18 @@ where >::Future: Send + 'static, ::Metric: Debug, { - /// Construct a peerset which uses `discover` internally. + /// Construct a peerset which uses `discover` to manage peer connections. + /// + /// Arguments: + /// - `config`: configures the peer set connection limit; + /// - `discover`: handles peer connects and disconnects; + /// - `demand_signal`: requests more peers when all peers are busy (unready); + /// - `handle_rx`: receives background task handles, + /// monitors them to make sure they're still running, + /// and shuts down all the tasks as soon as one task exits; + /// - `inv_stream`: receives inventory advertisements for peers, + /// allowing the peer set to direct inventory requests; + /// - `address_book`: when peer set is busy, it logs address book diagnostics. pub fn new( config: &Config, discover: D, @@ -172,6 +184,10 @@ where } } + /// Check background task handles to make sure they're still running. + /// + /// If any background task exits, shuts down all other background tasks, + /// and returns an error. fn poll_background_errors(&mut self, cx: &mut Context) -> Result<(), BoxError> { if self.guards.is_empty() { match self.handle_rx.try_recv() { @@ -187,13 +203,28 @@ where } } - match Pin::new(&mut self.guards).poll_next(cx) { - Poll::Pending => {} - Poll::Ready(Some(res)) => res??, - Poll::Ready(None) => Err("all background tasks have exited")?, + let exit_error = match Pin::new(&mut self.guards).poll_next(cx) { + Poll::Pending => return Ok(()), + Poll::Ready(Some(res)) => { + info!( + background_tasks = %self.guards.len(), + "a peer set background task exited, shutting down other peer set tasks" + ); + + // Flatten the join result and inner result, + // then turn Ok() task exits into errors. + res.map_err(Into::into) + .and_then(convert::identity) + .and(Err("a peer set background task exited".into())) + } + Poll::Ready(None) => Err("all peer set background tasks have exited".into()), + }; + + for guard in self.guards.iter() { + guard.abort(); } - Ok(()) + exit_error } fn poll_unready(&mut self, cx: &mut Context<'_>) {