From 9355c62e2245ed5d7a721fed56f55e463ccc2784 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:15:16 +1000 Subject: [PATCH 1/3] Remove a redundant outbound connector timeout --- zebra-network/src/peer/connector.rs | 9 ++++++--- zebra-network/src/peer/handshake.rs | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index e7047ea7128..67947f9e448 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -7,14 +7,13 @@ use std::{ }; use futures::prelude::*; -use tokio::{net::TcpStream, time::timeout}; +use tokio::net::TcpStream; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use crate::{ - constants::HANDSHAKE_TIMEOUT, peer::{Client, ConnectedAddr, Handshake, HandshakeRequest}, peer_set::ConnectionTracker, BoxError, PeerSocketAddr, Request, Response, @@ -93,8 +92,12 @@ where let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connector_span = info_span!("connector", peer = ?connected_addr); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. async move { - let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??; + let tcp_stream = TcpStream::connect(*addr).await?; let client = hs .oneshot(HandshakeRequest:: { data_stream: tcp_stream, diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index f6660ac0597..01cfe98e859 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -876,6 +876,10 @@ where let relay = self.relay; let minimum_peer_version = self.minimum_peer_version.clone(); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. let fut = async move { debug!( addr = ?connected_addr, From 4be1451f34aa0e059fd4e35a86a921c45f154b72 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:46:21 +1000 Subject: [PATCH 2/3] Fix panics in inbound connection handshaker --- zebra-network/src/peer_set/initialize.rs | 66 +++++++++++++++++------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 2d01437afce..eb666271632 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, HashSet}, convert::Infallible, net::SocketAddr, + pin::Pin, sync::Arc, time::Duration, }; @@ -15,13 +16,14 @@ use futures::{ future::{self, FutureExt}, sink::SinkExt, stream::{FuturesUnordered, StreamExt}, - TryFutureExt, + Future, TryFutureExt, }; use rand::seq::SliceRandom; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast, - time::{sleep, Instant}, + task::JoinError, + time::{error::Elapsed, sleep, Instant}, }; use tokio_stream::wrappers::IntervalStream; use tower::{ @@ -564,7 +566,14 @@ where "Inbound Connections", ); - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered< + Pin< + Box< + dyn Future, tokio::time::error::Elapsed>> + + Send, + >, + >, + > = FuturesUnordered::new(); // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -574,8 +583,20 @@ where biased; next_handshake_res = handshakes.next() => match next_handshake_res { // The task has already sent the peer change to the peer set. - Some(Ok(_)) => continue, - Some(Err(task_panic)) => panic!("panic in inbound handshake task: {task_panic:?}"), + Some(Ok(Ok(()))) => continue, + + // Handle the outer error layer: timeout + Some(Err(_timeout @ Elapsed { .. })) => { + info!("timeout in spawned accept_inbound_handshake() task"); + continue; + } + + // Handle the inner error layer: task + Some(Ok(Err(ref join_error @ JoinError { .. }))) if join_error.is_panic() => panic!("panic in inbound handshake task: {join_error:?}"), + Some(Ok(Err(ref join_error @ JoinError { .. }))) => { + info!("error in inbound handshake task: {join_error:?}, is Zebra shutting down?"); + continue; + } None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, @@ -613,16 +634,11 @@ where .await?; // This timeout helps locate inbound peer connection hangs, see #6763 for details. - handshakes.push(Box::pin( - tokio::time::timeout( - // Only trigger this timeout if the inner handshake timeout fails - HANDSHAKE_TIMEOUT + Duration::from_millis(500), - handshake_task, - ) - .inspect_err(|_elapsed| { - info!("timeout in spawned accept_inbound_handshake() task") - }), - )); + handshakes.push(Box::pin(tokio::time::timeout( + // Only trigger this timeout if the inner handshake timeout fails + HANDSHAKE_TIMEOUT + Duration::from_millis(500), + handshake_task, + ))); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -797,7 +813,9 @@ where let candidates = Arc::new(futures::lock::Mutex::new(candidates)); // This contains both crawl and handshake tasks. - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered< + Pin> + Send>>, + > = FuturesUnordered::new(); // returns None when empty. // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -905,7 +923,13 @@ where .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during handshaking: {e:?}"); + if e.is_panic() { + panic!("panic during handshaking: {e:?}"); + } else { + info!("task error during handshaking: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(HandshakeFinished) } }) .in_current_span(); @@ -929,7 +953,13 @@ where .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during TimerCrawl: {tick:?} {e:?}"); + if e.is_panic() { + panic!("panic during TimerCrawl: {tick:?} {e:?}"); + } else { + info!("task error during TimerCrawl: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(TimerCrawlFinished) } }) .in_current_span(); From e00002432ff3253fd43b1d7ad3eadc6a62eec0ae Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:54:16 +1000 Subject: [PATCH 3/3] Refactor to simplify FuturesUnordered types --- zebra-network/src/peer_set/initialize.rs | 68 +++++++++++++----------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index eb666271632..6235b500946 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -566,14 +566,8 @@ where "Inbound Connections", ); - let mut handshakes: FuturesUnordered< - Pin< - Box< - dyn Future, tokio::time::error::Elapsed>> - + Send, - >, - >, - > = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered + Send>>> = + FuturesUnordered::new(); // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -583,20 +577,7 @@ where biased; next_handshake_res = handshakes.next() => match next_handshake_res { // The task has already sent the peer change to the peer set. - Some(Ok(Ok(()))) => continue, - - // Handle the outer error layer: timeout - Some(Err(_timeout @ Elapsed { .. })) => { - info!("timeout in spawned accept_inbound_handshake() task"); - continue; - } - - // Handle the inner error layer: task - Some(Ok(Err(ref join_error @ JoinError { .. }))) if join_error.is_panic() => panic!("panic in inbound handshake task: {join_error:?}"), - Some(Ok(Err(ref join_error @ JoinError { .. }))) => { - info!("error in inbound handshake task: {join_error:?}, is Zebra shutting down?"); - continue; - } + Some(()) => continue, None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, @@ -631,14 +612,37 @@ where connection_tracker, peerset_tx.clone(), ) - .await?; + .await? + .map(move |res| match res { + Ok(()) => (), + Err(e @ JoinError { .. }) => { + if e.is_panic() { + panic!("panic during inbound handshaking: {e:?}"); + } else { + info!( + "task error during inbound handshaking: {e:?}, is Zebra shutting down?" + ) + } + } + }); - // This timeout helps locate inbound peer connection hangs, see #6763 for details. - handshakes.push(Box::pin(tokio::time::timeout( + let handshake_timeout = tokio::time::timeout( // Only trigger this timeout if the inner handshake timeout fails HANDSHAKE_TIMEOUT + Duration::from_millis(500), handshake_task, - ))); + ) + .map(move |res| match res { + Ok(()) => (), + Err(_e @ Elapsed { .. }) => { + info!( + "timeout in spawned accept_inbound_handshake() task: \ + inner task should have timeout out already" + ); + } + }); + + // This timeout helps locate inbound peer connection hangs, see #6763 for details. + handshakes.push(Box::pin(handshake_timeout)); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -922,11 +926,11 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { + Err(e @ JoinError {..}) => { if e.is_panic() { - panic!("panic during handshaking: {e:?}"); + panic!("panic during outbound handshake: {e:?}"); } else { - info!("task error during handshaking: {e:?}, is Zebra shutting down?") + info!("task error during outbound handshake: {e:?}, is Zebra shutting down?") } // Just fake it Ok(HandshakeFinished) @@ -952,11 +956,11 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { + Err(e @ JoinError {..}) => { if e.is_panic() { - panic!("panic during TimerCrawl: {tick:?} {e:?}"); + panic!("panic during outbound TimerCrawl: {tick:?} {e:?}"); } else { - info!("task error during TimerCrawl: {e:?}, is Zebra shutting down?") + info!("task error during outbound TimerCrawl: {e:?}, is Zebra shutting down?") } // Just fake it Ok(TimerCrawlFinished)