From 859353b4175fe70ddb74c00334e5e050230046e5 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Jun 2023 13:39:59 +1000 Subject: [PATCH] fix(panic): Stop panicking when handling inbound connection handshakes (#6984) * Remove a redundant outbound connector timeout * Fix panics in inbound connection handshaker * Refactor to simplify FuturesUnordered types --- zebra-network/src/peer/connector.rs | 9 ++- zebra-network/src/peer/handshake.rs | 4 ++ zebra-network/src/peer_set/initialize.rs | 76 +++++++++++++++++------- 3 files changed, 65 insertions(+), 24 deletions(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index e7047ea7128..67947f9e448 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -7,14 +7,13 @@ use std::{ }; use futures::prelude::*; -use tokio::{net::TcpStream, time::timeout}; +use tokio::net::TcpStream; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use crate::{ - constants::HANDSHAKE_TIMEOUT, peer::{Client, ConnectedAddr, Handshake, HandshakeRequest}, peer_set::ConnectionTracker, BoxError, PeerSocketAddr, Request, Response, @@ -93,8 +92,12 @@ where let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connector_span = info_span!("connector", peer = ?connected_addr); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. async move { - let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??; + let tcp_stream = TcpStream::connect(*addr).await?; let client = hs .oneshot(HandshakeRequest:: { data_stream: tcp_stream, diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index f6660ac0597..01cfe98e859 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -876,6 +876,10 @@ where let relay = self.relay; let minimum_peer_version = self.minimum_peer_version.clone(); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. let fut = async move { debug!( addr = ?connected_addr, diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index de27c8d37fb..49823e9514a 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, HashSet}, convert::Infallible, net::SocketAddr, + pin::Pin, sync::Arc, time::Duration, }; @@ -15,13 +16,14 @@ use futures::{ future::{self, FutureExt}, sink::SinkExt, stream::{FuturesUnordered, StreamExt}, - TryFutureExt, + Future, TryFutureExt, }; use rand::seq::SliceRandom; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast, - time::{sleep, Instant}, + task::JoinError, + time::{error::Elapsed, sleep, Instant}, }; use tokio_stream::wrappers::IntervalStream; use tower::{ @@ -565,7 +567,8 @@ where "Inbound Connections", ); - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered + Send>>> = + FuturesUnordered::new(); // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -575,8 +578,7 @@ where biased; next_handshake_res = handshakes.next() => match next_handshake_res { // The task has already sent the peer change to the peer set. - Some(Ok(_)) => continue, - Some(Err(task_panic)) => panic!("panic in inbound handshake task: {task_panic:?}"), + Some(()) => continue, None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, @@ -611,19 +613,37 @@ where connection_tracker, peerset_tx.clone(), ) - .await?; + .await? + .map(move |res| match res { + Ok(()) => (), + Err(e @ JoinError { .. }) => { + if e.is_panic() { + panic!("panic during inbound handshaking: {e:?}"); + } else { + info!( + "task error during inbound handshaking: {e:?}, is Zebra shutting down?" + ) + } + } + }); + + let handshake_timeout = tokio::time::timeout( + // Only trigger this timeout if the inner handshake timeout fails + HANDSHAKE_TIMEOUT + Duration::from_millis(500), + handshake_task, + ) + .map(move |res| match res { + Ok(()) => (), + Err(_e @ Elapsed { .. }) => { + info!( + "timeout in spawned accept_inbound_handshake() task: \ + inner task should have timeout out already" + ); + } + }); // This timeout helps locate inbound peer connection hangs, see #6763 for details. - handshakes.push(Box::pin( - tokio::time::timeout( - // Only trigger this timeout if the inner handshake timeout fails - HANDSHAKE_TIMEOUT + Duration::from_millis(500), - handshake_task, - ) - .inspect_err(|_elapsed| { - info!("timeout in spawned accept_inbound_handshake() task") - }), - )); + handshakes.push(Box::pin(handshake_timeout)); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -798,7 +818,9 @@ where let candidates = Arc::new(futures::lock::Mutex::new(candidates)); // This contains both crawl and handshake tasks. - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered< + Pin> + Send>>, + > = FuturesUnordered::new(); // returns None when empty. // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -905,8 +927,14 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during handshaking: {e:?}"); + Err(e @ JoinError {..}) => { + if e.is_panic() { + panic!("panic during outbound handshake: {e:?}"); + } else { + info!("task error during outbound handshake: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(HandshakeFinished) } }) .in_current_span(); @@ -929,8 +957,14 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { - panic!("panic during TimerCrawl: {tick:?} {e:?}"); + Err(e @ JoinError {..}) => { + if e.is_panic() { + panic!("panic during outbound TimerCrawl: {tick:?} {e:?}"); + } else { + info!("task error during outbound TimerCrawl: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(TimerCrawlFinished) } }) .in_current_span();