diff --git a/Cargo.lock b/Cargo.lock index a13d7785c45..2b43be85163 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5852,6 +5852,7 @@ dependencies = [ "howudoin", "humantime-serde", "indexmap", + "itertools", "lazy_static", "metrics 0.21.0", "ordered-map", diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index 6a24808d612..11b74e988f6 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -30,6 +30,7 @@ chrono = { version = "0.4.25", default-features = false, features = ["clock", "s hex = "0.4.3" humantime-serde = "1.1.1" indexmap = { version = "1.9.3", features = ["serde"] } +itertools = "0.10.5" lazy_static = "1.4.0" ordered-map = "0.4.2" pin-project = "1.1.0" diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index 7b7f51b5fa7..015ced16df9 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -335,6 +335,9 @@ pub const MIN_OVERLOAD_DROP_PROBABILITY: f32 = 0.05; /// [`Overloaded`](crate::PeerError::Overloaded) error. pub const MAX_OVERLOAD_DROP_PROBABILITY: f32 = 0.95; +/// The minimum interval between logging peer set status updates. +pub const MIN_PEER_SET_LOG_INTERVAL: Duration = Duration::from_secs(60); + lazy_static! { /// The minimum network protocol version accepted by this crate for each network, /// represented as a network upgrade. diff --git a/zebra-network/src/peer/client.rs b/zebra-network/src/peer/client.rs index f264cc5ff98..69940275414 100644 --- a/zebra-network/src/peer/client.rs +++ b/zebra-network/src/peer/client.rs @@ -543,10 +543,14 @@ impl Client { // Prevent any senders from sending more messages to this peer. self.server_tx.close_channel(); - // Stop the heartbeat task + // Ask the heartbeat task to stop. if let Some(shutdown_tx) = self.shutdown_tx.take() { let _ = shutdown_tx.send(CancelHeartbeatTask); } + + // Force the connection and heartbeat tasks to stop. + self.connection_task.abort(); + self.heartbeat_task.abort(); } } diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 5d8f83039e6..318357dbd6b 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -451,7 +451,10 @@ impl From for InboundMessage { } /// The channels, services, and associated state for a peer connection. -pub struct Connection { +pub struct Connection +where + Tx: Sink + Unpin, +{ /// The metadata for the connected peer `service`. /// /// This field is used for debugging. @@ -519,7 +522,10 @@ pub struct Connection { last_overload_time: Option, } -impl fmt::Debug for Connection { +impl fmt::Debug for Connection +where + Tx: Sink + Unpin, +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { // skip the channels, they don't tell us anything useful f.debug_struct(std::any::type_name::>()) @@ -534,7 +540,10 @@ impl fmt::Debug for Connection { } } -impl Connection { +impl Connection +where + Tx: Sink + Unpin, +{ /// Return a new connection from its channels, services, and shared state. pub(crate) fn new( inbound_service: S, @@ -645,9 +654,9 @@ where // the request completes (or times out). match future::select(peer_rx.next(), self.client_rx.next()).await { Either::Left((None, _)) => { - self.fail_with(PeerError::ConnectionClosed); + self.fail_with(PeerError::ConnectionClosed).await; } - Either::Left((Some(Err(e)), _)) => self.fail_with(e), + Either::Left((Some(Err(e)), _)) => self.fail_with(e).await, Either::Left((Some(Ok(msg)), _)) => { let unhandled_msg = self.handle_message_as_request(msg).await; @@ -663,7 +672,8 @@ where // There are no requests to be flushed, // but we need to set an error and update metrics. - self.shutdown(PeerError::ClientDropped); + // (We don't want to log this error, because it's normal behaviour.) + self.shutdown_async(PeerError::ClientDropped).await; break; } Either::Right((Some(req), _)) => { @@ -753,8 +763,10 @@ where .instrument(span.clone()) .await { - Either::Right((None, _)) => self.fail_with(PeerError::ConnectionClosed), - Either::Right((Some(Err(e)), _)) => self.fail_with(e), + Either::Right((None, _)) => { + self.fail_with(PeerError::ConnectionClosed).await + } + Either::Right((Some(Err(e)), _)) => self.fail_with(e).await, Either::Right((Some(Ok(peer_msg)), _cancel)) => { self.update_state_metrics(format!("Out::Rsp::{}", peer_msg.command())); @@ -813,7 +825,7 @@ where // So we do the state request cleanup manually. let e = SharedPeerError::from(e); let _ = tx.send(Err(e.clone())); - self.fail_with(e); + self.fail_with(e).await; State::Failed } // Other request timeouts fail the request. @@ -840,6 +852,8 @@ where } } + // TODO: close peer_rx here, after changing it from a stream to a channel + let error = self.error_slot.try_get_error(); assert!( error.is_some(), @@ -849,18 +863,21 @@ where self.update_state_metrics(error.expect("checked is_some").to_string()); } - /// Fail this connection. + /// Fail this connection, log the failure, and shut it down. + /// See [`Self::shutdown_async()`] for details. /// - /// If the connection has errored already, re-use the original error. - /// Otherwise, fail the connection with `error`. - fn fail_with(&mut self, error: impl Into) { + /// Use [`Self::shutdown_async()`] to avoid logging the failure, + /// and [`Self::shutdown()`] from non-async code. + async fn fail_with(&mut self, error: impl Into) { let error = error.into(); - debug!(%error, - client_receiver = ?self.client_rx, - "failing peer service with error"); + debug!( + %error, + client_receiver = ?self.client_rx, + "failing peer service with error" + ); - self.shutdown(error); + self.shutdown_async(error).await; } /// Handle an internal client request, possibly generating outgoing messages to the @@ -1052,7 +1069,7 @@ where Err(error) => { let error = SharedPeerError::from(error); let _ = tx.send(Err(error.clone())); - self.fail_with(error); + self.fail_with(error).await; } }; } @@ -1075,17 +1092,17 @@ where Message::Ping(nonce) => { trace!(?nonce, "responding to heartbeat"); if let Err(e) = self.peer_tx.send(Message::Pong(nonce)).await { - self.fail_with(e); + self.fail_with(e).await; } Consumed } // These messages shouldn't be sent outside of a handshake. Message::Version { .. } => { - self.fail_with(PeerError::DuplicateHandshake); + self.fail_with(PeerError::DuplicateHandshake).await; Consumed } Message::Verack { .. } => { - self.fail_with(PeerError::DuplicateHandshake); + self.fail_with(PeerError::DuplicateHandshake).await; Consumed } // These messages should already be handled as a response if they @@ -1267,7 +1284,7 @@ where tokio::task::yield_now().await; if self.svc.ready().await.is_err() { - self.fail_with(PeerError::ServiceShutdown); + self.fail_with(PeerError::ServiceShutdown).await; return; } @@ -1312,7 +1329,7 @@ where Response::Nil => { /* generic success, do nothing */ } Response::Peers(addrs) => { if let Err(e) = self.peer_tx.send(Message::Addr(addrs)).await { - self.fail_with(e); + self.fail_with(e).await; } } Response::Transactions(transactions) => { @@ -1324,7 +1341,7 @@ where match transaction { Available(transaction) => { if let Err(e) = self.peer_tx.send(Message::Tx(transaction)).await { - self.fail_with(e); + self.fail_with(e).await; return; } } @@ -1334,7 +1351,7 @@ where if !missing_ids.is_empty() { if let Err(e) = self.peer_tx.send(Message::NotFound(missing_ids)).await { - self.fail_with(e); + self.fail_with(e).await; return; } } @@ -1348,7 +1365,7 @@ where match block { Available(block) => { if let Err(e) = self.peer_tx.send(Message::Block(block)).await { - self.fail_with(e); + self.fail_with(e).await; return; } } @@ -1358,7 +1375,7 @@ where if !missing_hashes.is_empty() { if let Err(e) = self.peer_tx.send(Message::NotFound(missing_hashes)).await { - self.fail_with(e); + self.fail_with(e).await; return; } } @@ -1369,12 +1386,12 @@ where .send(Message::Inv(hashes.into_iter().map(Into::into).collect())) .await { - self.fail_with(e) + self.fail_with(e).await } } Response::BlockHeaders(headers) => { if let Err(e) = self.peer_tx.send(Message::Headers(headers)).await { - self.fail_with(e) + self.fail_with(e).await } } Response::TransactionIds(hashes) => { @@ -1402,7 +1419,7 @@ where .collect(); if let Err(e) = self.peer_tx.send(Message::Inv(hashes)).await { - self.fail_with(e) + self.fail_with(e).await } } } @@ -1454,7 +1471,7 @@ where ); self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Error", req.command())); - self.fail_with(PeerError::Overloaded); + self.fail_with(PeerError::Overloaded).await; } else { self.update_state_metrics(format!("In::Req::{}/Rsp::Overload::Ignored", req.command())); metrics::counter!("pool.ignored.loadshed", 1); @@ -1499,7 +1516,10 @@ fn overload_drop_connection_probability(now: Instant, prev: Option) -> raw_drop_probability.clamp(MIN_OVERLOAD_DROP_PROBABILITY, MAX_OVERLOAD_DROP_PROBABILITY) } -impl Connection { +impl Connection +where + Tx: Sink + Unpin, +{ /// Update the connection state metrics for this connection, /// using `extra_state_info` as additional state information. fn update_state_metrics(&mut self, extra_state_info: impl Into>) { @@ -1538,18 +1558,32 @@ impl Connection { } } - /// Marks the peer as having failed with `error`, and performs connection cleanup. + /// Marks the peer as having failed with `error`, and performs connection cleanup, + /// including async channel closes. /// /// If the connection has errored already, re-use the original error. /// Otherwise, fail the connection with `error`. + async fn shutdown_async(&mut self, error: impl Into) { + // Close async channels first, so other tasks can start shutting down. + // There's nothing we can do about errors while shutting down, and some errors are expected. + // + // TODO: close peer_tx and peer_rx in shutdown() and Drop, after: + // - using channels instead of streams/sinks? + // - exposing the underlying implementation rather than using generics and closures? + // - adding peer_rx to the connection struct (optional) + let _ = self.peer_tx.close().await; + + self.shutdown(error); + } + + /// Marks the peer as having failed with `error`, and performs connection cleanup. + /// See [`Self::shutdown_async()`] for details. + /// + /// Call [`Self::shutdown_async()`] in async code, because it can shut down more channels. fn shutdown(&mut self, error: impl Into) { let mut error = error.into(); // Close channels first, so other tasks can start shutting down. - // - // TODO: close peer_tx and peer_rx, after: - // - adapting them using a struct with a Stream impl, rather than closures - // - making the struct forward `close` to the inner channel self.client_rx.close(); // Update the shared error slot @@ -1617,7 +1651,10 @@ impl Connection { } } -impl Drop for Connection { +impl Drop for Connection +where + Tx: Sink + Unpin, +{ fn drop(&mut self) { self.shutdown(PeerError::ConnectionDropped); diff --git a/zebra-network/src/peer/connection/peer_tx.rs b/zebra-network/src/peer/connection/peer_tx.rs index 7e17196d95d..47df6504903 100644 --- a/zebra-network/src/peer/connection/peer_tx.rs +++ b/zebra-network/src/peer/connection/peer_tx.rs @@ -1,6 +1,6 @@ //! The peer message sender channel. -use futures::{Sink, SinkExt}; +use futures::{FutureExt, Sink, SinkExt}; use zebra_chain::serialization::SerializationError; @@ -10,7 +10,10 @@ use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError}; /// /// Used to apply a timeout to send messages. #[derive(Clone, Debug)] -pub struct PeerTx { +pub struct PeerTx +where + Tx: Sink + Unpin, +{ /// A channel for sending Zcash messages to the connected peer. /// /// This channel accepts [`Message`]s. @@ -28,10 +31,28 @@ where .map_err(|_| PeerError::ConnectionSendTimeout)? .map_err(Into::into) } + + /// Flush any remaining output and close this [`PeerTx`], if necessary. + pub async fn close(&mut self) -> Result<(), SerializationError> { + self.inner.close().await + } } -impl From for PeerTx { +impl From for PeerTx +where + Tx: Sink + Unpin, +{ fn from(tx: Tx) -> Self { PeerTx { inner: tx } } } + +impl Drop for PeerTx +where + Tx: Sink + Unpin, +{ + fn drop(&mut self) { + // Do a last-ditch close attempt on the sink + self.close().now_or_never(); + } +} diff --git a/zebra-network/src/peer_set/set.rs b/zebra-network/src/peer_set/set.rs index abdd2a87495..8611ef7c633 100644 --- a/zebra-network/src/peer_set/set.rs +++ b/zebra-network/src/peer_set/set.rs @@ -98,6 +98,7 @@ use std::{ fmt::Debug, future::Future, marker::PhantomData, + net::IpAddr, pin::Pin, task::{Context, Poll}, time::Instant, @@ -109,6 +110,7 @@ use futures::{ prelude::*, stream::FuturesUnordered, }; +use itertools::Itertools; use tokio::{ sync::{broadcast, oneshot::error::TryRecvError, watch}, task::JoinHandle, @@ -123,6 +125,7 @@ use zebra_chain::chain_tip::ChainTip; use crate::{ address_book::AddressMetrics, + constants::MIN_PEER_SET_LOG_INTERVAL, peer::{LoadTrackedClient, MinimumPeerVersion}, peer_set::{ unready_service::{Error as UnreadyError, UnreadyService}, @@ -810,33 +813,84 @@ where (self.ready_services.len() + 1) / 2 } - /// Logs the peer set size. + /// Returns the list of addresses in the peer set. + fn peer_set_addresses(&self) -> Vec { + self.ready_services + .keys() + .chain(self.cancel_handles.keys()) + .cloned() + .collect() + } + + /// Logs the peer set size, and any potential connectivity issues. fn log_peer_set_size(&mut self) { let ready_services_len = self.ready_services.len(); let unready_services_len = self.unready_services.len(); trace!(ready_peers = ?ready_services_len, unready_peers = ?unready_services_len); - if ready_services_len > 0 { - return; - } + let now = Instant::now(); // These logs are designed to be human-readable in a terminal, at the // default Zebra log level. If you need to know the peer set size for // every request, use the trace-level logs, or the metrics exporter. if let Some(last_peer_log) = self.last_peer_log { // Avoid duplicate peer set logs - if Instant::now().duration_since(last_peer_log).as_secs() < 60 { + if now.duration_since(last_peer_log) < MIN_PEER_SET_LOG_INTERVAL { return; } } else { // Suppress initial logs until the peer set has started up. // There can be multiple initial requests before the first peer is // ready. - self.last_peer_log = Some(Instant::now()); + self.last_peer_log = Some(now); return; } - self.last_peer_log = Some(Instant::now()); + self.last_peer_log = Some(now); + + // Log potential duplicate connections. + let peers = self.peer_set_addresses(); + + // Check for duplicates by address and port: these are unexpected and represent a bug. + let duplicates: Vec = peers.iter().duplicates().cloned().collect(); + + let mut peer_counts = peers.iter().counts(); + peer_counts.retain(|peer, _count| duplicates.contains(peer)); + + if !peer_counts.is_empty() { + let duplicate_connections: usize = peer_counts.values().sum(); + + warn!( + ?duplicate_connections, + duplicated_peers = ?peer_counts.len(), + peers = ?peers.len(), + "duplicate peer connections in peer set" + ); + } + + // Check for duplicates by address: these can happen if there are multiple nodes + // behind a NAT or on a single server. + let peers: Vec = peers.iter().map(|addr| addr.ip()).collect(); + let duplicates: Vec = peers.iter().duplicates().cloned().collect(); + + let mut peer_counts = peers.iter().counts(); + peer_counts.retain(|peer, _count| duplicates.contains(peer)); + + if !peer_counts.is_empty() { + let duplicate_connections: usize = peer_counts.values().sum(); + + info!( + ?duplicate_connections, + duplicated_peers = ?peer_counts.len(), + peers = ?peers.len(), + "duplicate IP addresses in peer set" + ); + } + + // Only log connectivity warnings if all our peers are busy (or there are no peers). + if ready_services_len > 0 { + return; + } let address_metrics = *self.address_metrics.borrow(); if unready_services_len == 0 {