From 9355c62e2245ed5d7a721fed56f55e463ccc2784 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:15:16 +1000 Subject: [PATCH 1/7] Remove a redundant outbound connector timeout --- zebra-network/src/peer/connector.rs | 9 ++++++--- zebra-network/src/peer/handshake.rs | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index e7047ea7128..67947f9e448 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -7,14 +7,13 @@ use std::{ }; use futures::prelude::*; -use tokio::{net::TcpStream, time::timeout}; +use tokio::net::TcpStream; use tower::{Service, ServiceExt}; use tracing_futures::Instrument; use zebra_chain::chain_tip::{ChainTip, NoChainTip}; use crate::{ - constants::HANDSHAKE_TIMEOUT, peer::{Client, ConnectedAddr, Handshake, HandshakeRequest}, peer_set::ConnectionTracker, BoxError, PeerSocketAddr, Request, Response, @@ -93,8 +92,12 @@ where let connected_addr = ConnectedAddr::new_outbound_direct(addr); let connector_span = info_span!("connector", peer = ?connected_addr); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. async move { - let tcp_stream = timeout(HANDSHAKE_TIMEOUT, TcpStream::connect(*addr)).await??; + let tcp_stream = TcpStream::connect(*addr).await?; let client = hs .oneshot(HandshakeRequest:: { data_stream: tcp_stream, diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs index f6660ac0597..01cfe98e859 100644 --- a/zebra-network/src/peer/handshake.rs +++ b/zebra-network/src/peer/handshake.rs @@ -876,6 +876,10 @@ where let relay = self.relay; let minimum_peer_version = self.minimum_peer_version.clone(); + // # Security + // + // `zebra_network::init()` implements a connection timeout on this future. + // Any code outside this future does not have a timeout. let fut = async move { debug!( addr = ?connected_addr, From 4be1451f34aa0e059fd4e35a86a921c45f154b72 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:46:21 +1000 Subject: [PATCH 2/7] Fix panics in inbound connection handshaker --- zebra-network/src/peer_set/initialize.rs | 66 +++++++++++++++++------- 1 file changed, 48 insertions(+), 18 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 2d01437afce..eb666271632 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, HashSet}, convert::Infallible, net::SocketAddr, + pin::Pin, sync::Arc, time::Duration, }; @@ -15,13 +16,14 @@ use futures::{ future::{self, FutureExt}, sink::SinkExt, stream::{FuturesUnordered, StreamExt}, - TryFutureExt, + Future, TryFutureExt, }; use rand::seq::SliceRandom; use tokio::{ net::{TcpListener, TcpStream}, sync::broadcast, - time::{sleep, Instant}, + task::JoinError, + time::{error::Elapsed, sleep, Instant}, }; use tokio_stream::wrappers::IntervalStream; use tower::{ @@ -564,7 +566,14 @@ where "Inbound Connections", ); - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered< + Pin< + Box< + dyn Future, tokio::time::error::Elapsed>> + + Send, + >, + >, + > = FuturesUnordered::new(); // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -574,8 +583,20 @@ where biased; next_handshake_res = handshakes.next() => match next_handshake_res { // The task has already sent the peer change to the peer set. - Some(Ok(_)) => continue, - Some(Err(task_panic)) => panic!("panic in inbound handshake task: {task_panic:?}"), + Some(Ok(Ok(()))) => continue, + + // Handle the outer error layer: timeout + Some(Err(_timeout @ Elapsed { .. })) => { + info!("timeout in spawned accept_inbound_handshake() task"); + continue; + } + + // Handle the inner error layer: task + Some(Ok(Err(ref join_error @ JoinError { .. }))) if join_error.is_panic() => panic!("panic in inbound handshake task: {join_error:?}"), + Some(Ok(Err(ref join_error @ JoinError { .. }))) => { + info!("error in inbound handshake task: {join_error:?}, is Zebra shutting down?"); + continue; + } None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, @@ -613,16 +634,11 @@ where .await?; // This timeout helps locate inbound peer connection hangs, see #6763 for details. - handshakes.push(Box::pin( - tokio::time::timeout( - // Only trigger this timeout if the inner handshake timeout fails - HANDSHAKE_TIMEOUT + Duration::from_millis(500), - handshake_task, - ) - .inspect_err(|_elapsed| { - info!("timeout in spawned accept_inbound_handshake() task") - }), - )); + handshakes.push(Box::pin(tokio::time::timeout( + // Only trigger this timeout if the inner handshake timeout fails + HANDSHAKE_TIMEOUT + Duration::from_millis(500), + handshake_task, + ))); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -797,7 +813,9 @@ where let candidates = Arc::new(futures::lock::Mutex::new(candidates)); // This contains both crawl and handshake tasks. - let mut handshakes = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered< + Pin> + Send>>, + > = FuturesUnordered::new(); // returns None when empty. // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -905,7 +923,13 @@ where .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during handshaking: {e:?}"); + if e.is_panic() { + panic!("panic during handshaking: {e:?}"); + } else { + info!("task error during handshaking: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(HandshakeFinished) } }) .in_current_span(); @@ -929,7 +953,13 @@ where .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e) => { - panic!("panic during TimerCrawl: {tick:?} {e:?}"); + if e.is_panic() { + panic!("panic during TimerCrawl: {tick:?} {e:?}"); + } else { + info!("task error during TimerCrawl: {e:?}, is Zebra shutting down?") + } + // Just fake it + Ok(TimerCrawlFinished) } }) .in_current_span(); From e00002432ff3253fd43b1d7ad3eadc6a62eec0ae Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 16 Jun 2023 11:54:16 +1000 Subject: [PATCH 3/7] Refactor to simplify FuturesUnordered types --- zebra-network/src/peer_set/initialize.rs | 68 +++++++++++++----------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index eb666271632..6235b500946 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -566,14 +566,8 @@ where "Inbound Connections", ); - let mut handshakes: FuturesUnordered< - Pin< - Box< - dyn Future, tokio::time::error::Elapsed>> - + Send, - >, - >, - > = FuturesUnordered::new(); + let mut handshakes: FuturesUnordered + Send>>> = + FuturesUnordered::new(); // Keeping an unresolved future in the pool means the stream never terminates. handshakes.push(future::pending().boxed()); @@ -583,20 +577,7 @@ where biased; next_handshake_res = handshakes.next() => match next_handshake_res { // The task has already sent the peer change to the peer set. - Some(Ok(Ok(()))) => continue, - - // Handle the outer error layer: timeout - Some(Err(_timeout @ Elapsed { .. })) => { - info!("timeout in spawned accept_inbound_handshake() task"); - continue; - } - - // Handle the inner error layer: task - Some(Ok(Err(ref join_error @ JoinError { .. }))) if join_error.is_panic() => panic!("panic in inbound handshake task: {join_error:?}"), - Some(Ok(Err(ref join_error @ JoinError { .. }))) => { - info!("error in inbound handshake task: {join_error:?}, is Zebra shutting down?"); - continue; - } + Some(()) => continue, None => unreachable!("handshakes never terminates, because it contains a future that never resolves"), }, @@ -631,14 +612,37 @@ where connection_tracker, peerset_tx.clone(), ) - .await?; + .await? + .map(move |res| match res { + Ok(()) => (), + Err(e @ JoinError { .. }) => { + if e.is_panic() { + panic!("panic during inbound handshaking: {e:?}"); + } else { + info!( + "task error during inbound handshaking: {e:?}, is Zebra shutting down?" + ) + } + } + }); - // This timeout helps locate inbound peer connection hangs, see #6763 for details. - handshakes.push(Box::pin(tokio::time::timeout( + let handshake_timeout = tokio::time::timeout( // Only trigger this timeout if the inner handshake timeout fails HANDSHAKE_TIMEOUT + Duration::from_millis(500), handshake_task, - ))); + ) + .map(move |res| match res { + Ok(()) => (), + Err(_e @ Elapsed { .. }) => { + info!( + "timeout in spawned accept_inbound_handshake() task: \ + inner task should have timeout out already" + ); + } + }); + + // This timeout helps locate inbound peer connection hangs, see #6763 for details. + handshakes.push(Box::pin(handshake_timeout)); // Rate-limit inbound connection handshakes. // But sleep longer after a successful connection, @@ -922,11 +926,11 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { + Err(e @ JoinError {..}) => { if e.is_panic() { - panic!("panic during handshaking: {e:?}"); + panic!("panic during outbound handshake: {e:?}"); } else { - info!("task error during handshaking: {e:?}, is Zebra shutting down?") + info!("task error during outbound handshake: {e:?}, is Zebra shutting down?") } // Just fake it Ok(HandshakeFinished) @@ -952,11 +956,11 @@ where }) .map(move |res| match res { Ok(crawler_action) => crawler_action, - Err(e) => { + Err(e @ JoinError {..}) => { if e.is_panic() { - panic!("panic during TimerCrawl: {tick:?} {e:?}"); + panic!("panic during outbound TimerCrawl: {tick:?} {e:?}"); } else { - info!("task error during TimerCrawl: {e:?}, is Zebra shutting down?") + info!("task error during outbound TimerCrawl: {e:?}, is Zebra shutting down?") } // Just fake it Ok(TimerCrawlFinished) From d356b5e0dd1a9ea6b602e49be1946d5b4ee87ebb Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Jun 2023 08:42:08 +1000 Subject: [PATCH 4/7] Make licensing notes consistent --- zebra-network/Cargo.toml | 6 +++++- zebra-network/src/peer_set/initialize.rs | 7 ++++--- zebra-network/src/peer_set/set.rs | 1 + zebra-network/src/peer_set/unready_service.rs | 9 ++++++--- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index ae5639881d1..cd1029e6325 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -7,7 +7,11 @@ description = "Networking code for Zebra" # # This licence is deliberately different to the rest of Zebra. # -# zebra-network/src/peer_set/set.rs was modified from a 2019 version of: +# Some code in: +# zebra-network/src/peer_set/set.rs +# zebra-network/src/peer_set/unready_service.rs +# zebra-network/src/peer_set/initialize.rs +# was modified from a 2019 version of: # https://github.com/tower-rs/tower/tree/master/tower/src/balance/p2c/service.rs license = "MIT" repository = "https://github.com/ZcashFoundation/zebra" diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index 6235b500946..f3512764a12 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -1,7 +1,8 @@ //! A peer set whose size is dynamically determined by resource constraints. - -// Portions of this submodule were adapted from tower-balance, -// which is (c) 2019 Tower Contributors (MIT licensed). +//! +//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance]. +//! +//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance use std::{ collections::{BTreeMap, HashSet}, diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index 0353d377f5e..8cdf0c099ec 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -3,6 +3,7 @@ //! # Implementation //! //! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance]. +//! //! As described in Tower's documentation, it: //! //! > Distributes requests across inner services using the [Power of Two Choices][p2c]. diff --git a/zebra-network/src/peer_set/unready_service.rs b/zebra-network/src/peer_set/unready_service.rs index 108a9e8307f..d49587cde1d 100644 --- a/zebra-network/src/peer_set/unready_service.rs +++ b/zebra-network/src/peer_set/unready_service.rs @@ -1,6 +1,9 @@ -/// Services that are busy or newly created. -/// -/// Adapted from tower-balance. +//! Services that are busy or newly created. +//! +//! The [`UnreadyService`] implementation is adapted from the one in [tower::Balance][tower-balance]. +//! +//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance + use std::{ future::Future, marker::PhantomData, From 0f0239e16471c814b5c0a801d8b050ddf9f892b5 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Jun 2023 08:45:17 +1000 Subject: [PATCH 5/7] Delete redundant `move` in closures --- zebra-network/src/peer_set/initialize.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index f3512764a12..be267927e72 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -614,7 +614,7 @@ where peerset_tx.clone(), ) .await? - .map(move |res| match res { + .map(|res| match res { Ok(()) => (), Err(e @ JoinError { .. }) => { if e.is_panic() { @@ -632,7 +632,7 @@ where HANDSHAKE_TIMEOUT + Duration::from_millis(500), handshake_task, ) - .map(move |res| match res { + .map(|res| match res { Ok(()) => (), Err(_e @ Elapsed { .. }) => { info!( @@ -925,7 +925,7 @@ where Ok(DemandCrawlFinished) } }) - .map(move |res| match res { + .map(|res| match res { Ok(crawler_action) => crawler_action, Err(e @ JoinError {..}) => { if e.is_panic() { From 4ac1b33b13a8f420a1ae05fc978e4adfd23b9908 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Jun 2023 08:45:55 +1000 Subject: [PATCH 6/7] Fix a log typo --- zebra-network/src/peer_set/initialize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index be267927e72..cf762f75b73 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -637,7 +637,7 @@ where Err(_e @ Elapsed { .. }) => { info!( "timeout in spawned accept_inbound_handshake() task: \ - inner task should have timeout out already" + inner task should have timed out already" ); } }); From 2900eb8b5a55dc7aae9d6a006811991c544752d9 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 19 Jun 2023 09:00:58 +1000 Subject: [PATCH 7/7] Add some missing tracing spans --- zebra-network/src/config.rs | 57 +++++++++++++----------- zebra-network/src/peer_cache_updater.rs | 1 + zebra-network/src/peer_set/initialize.rs | 17 ++++--- 3 files changed, 41 insertions(+), 34 deletions(-) diff --git a/zebra-network/src/config.rs b/zebra-network/src/config.rs index 067a50ba09c..23e812f8515 100644 --- a/zebra-network/src/config.rs +++ b/zebra-network/src/config.rs @@ -13,6 +13,7 @@ use indexmap::IndexSet; use serde::{de, Deserialize, Deserializer}; use tempfile::NamedTempFile; use tokio::{fs, io::AsyncWriteExt}; +use tracing::Span; use zebra_chain::parameters::Network; @@ -493,12 +494,15 @@ impl Config { // Create the temporary file. // Do blocking filesystem operations on a dedicated thread. + let span = Span::current(); let tmp_peer_cache_file = tokio::task::spawn_blocking(move || { - // Put the temporary file in the same directory as the permanent file, - // so atomic filesystem operations are possible. - tempfile::Builder::new() - .prefix(&tmp_peer_cache_prefix) - .tempfile_in(peer_cache_dir) + span.in_scope(move || { + // Put the temporary file in the same directory as the permanent file, + // so atomic filesystem operations are possible. + tempfile::Builder::new() + .prefix(&tmp_peer_cache_prefix) + .tempfile_in(peer_cache_dir) + }) }) .await .expect("unexpected panic creating temporary peer cache file")?; @@ -514,31 +518,34 @@ impl Config { // Atomically replace the current cache with the temporary cache. // Do blocking filesystem operations on a dedicated thread. + let span = Span::current(); tokio::task::spawn_blocking(move || { - let result = tmp_peer_cache_file.persist(&peer_cache_file); + span.in_scope(move || { + let result = tmp_peer_cache_file.persist(&peer_cache_file); - // Drops the temp file if needed - match result { - Ok(_temp_file) => { - info!( - cached_ip_count = ?peer_list.len(), - ?peer_cache_file, - "updated cached peer IP addresses" - ); - - for ip in &peer_list { - metrics::counter!( - "zcash.net.peers.cache", - 1, - "cache" => peer_cache_file.display().to_string(), - "remote_ip" => ip.to_string() + // Drops the temp file if needed + match result { + Ok(_temp_file) => { + info!( + cached_ip_count = ?peer_list.len(), + ?peer_cache_file, + "updated cached peer IP addresses" ); - } - Ok(()) + for ip in &peer_list { + metrics::counter!( + "zcash.net.peers.cache", + 1, + "cache" => peer_cache_file.display().to_string(), + "remote_ip" => ip.to_string() + ); + } + + Ok(()) + } + Err(error) => Err(error.error), } - Err(error) => Err(error.error), - } + }) }) .await .expect("unexpected panic making temporary peer cache file permanent") diff --git a/zebra-network/src/peer_cache_updater.rs b/zebra-network/src/peer_cache_updater.rs index 3d23f4d27a5..64c160e815f 100644 --- a/zebra-network/src/peer_cache_updater.rs +++ b/zebra-network/src/peer_cache_updater.rs @@ -15,6 +15,7 @@ use crate::{ }; /// An ongoing task that regularly caches the current `address_book` to disk, based on `config`. +#[instrument(skip(config, address_book))] pub async fn peer_cache_updater( config: Config, address_book: Arc>, diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index cf762f75b73..b7715aebfb3 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -677,6 +677,9 @@ where /// /// Uses `handshaker` to perform a Zcash network protocol handshake, and sends /// the [`peer::Client`] result over `peerset_tx`. +// +// TODO: when we support inbound proxies, distinguish between proxied listeners and +// direct listeners in the span generated by this instrument macro #[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))] async fn accept_inbound_handshake( addr: PeerSocketAddr, @@ -701,8 +704,6 @@ where // // This await is okay because the handshaker's `poll_ready` method always returns Ready. handshaker.ready().await?; - // TODO: distinguish between proxied listeners and direct listeners - let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); // Construct a handshake future but do not drive it yet.... let handshake = handshaker.call(HandshakeRequest { @@ -724,7 +725,7 @@ where debug!(?handshake_result, "error handshaking with inbound peer"); } } - .instrument(handshaker_span), + .in_current_span(), ); Ok(handshake_task) @@ -924,7 +925,7 @@ where Ok(DemandCrawlFinished) } - }) + }.in_current_span()) .map(|res| match res { Ok(crawler_action) => crawler_action, Err(e @ JoinError {..}) => { @@ -936,8 +937,7 @@ where // Just fake it Ok(HandshakeFinished) } - }) - .in_current_span(); + }); handshakes.push(Box::pin(handshake_or_crawl_handle)); } @@ -954,7 +954,7 @@ where crawl(candidates, demand_tx).await?; Ok(TimerCrawlFinished) - }) + }.in_current_span()) .map(move |res| match res { Ok(crawler_action) => crawler_action, Err(e @ JoinError {..}) => { @@ -966,8 +966,7 @@ where // Just fake it Ok(TimerCrawlFinished) } - }) - .in_current_span(); + }); handshakes.push(Box::pin(crawl_handle)); }